1use crate::app::App;
2use crate::utils::{parse_bool_env, parse_env, parse_env_optional};
3use ahash::AHashMap;
4use serde::{Deserialize, Serialize};
5use std::str::FromStr;
6use tracing::{info, warn};
7use url::Url;
8
9fn deserialize_octal_permission<'de, D>(deserializer: D) -> Result<u32, D::Error>
11where
12 D: serde::Deserializer<'de>,
13{
14 use serde::de::{self};
15
16 let s = String::deserialize(deserializer)?;
17
18 if !s.chars().all(|c| c.is_digit(8)) {
20 return Err(de::Error::custom(format!(
21 "invalid octal permission mode '{}': must contain only digits 0-7",
22 s
23 )));
24 }
25
26 let mode = u32::from_str_radix(&s, 8)
28 .map_err(|_| de::Error::custom(format!("invalid octal permission mode: {}", s)))?;
29
30 if mode > 0o777 {
32 return Err(de::Error::custom(format!(
33 "permission mode '{}' exceeds maximum value 777",
34 s
35 )));
36 }
37
38 Ok(mode)
39}
40
41fn parse_driver_enum<T: FromStr + Clone + std::fmt::Debug>(
43 driver_str: String,
44 default_driver: T,
45 driver_name: &str,
46) -> T
47where
48 <T as FromStr>::Err: std::fmt::Debug,
49{
50 match T::from_str(&driver_str.to_lowercase()) {
51 Ok(driver_enum) => driver_enum,
52 Err(e) => {
53 warn!(
54 "Failed to parse {} driver from string '{}': {:?}. Using default: {:?}.",
55 driver_name, driver_str, e, default_driver
56 );
57 default_driver
58 }
59 }
60}
61
62fn override_db_pool_settings(db_conn: &mut DatabaseConnection, prefix: &str) {
63 if let Some(min) = parse_env_optional::<u32>(&format!("{}_POOL_MIN", prefix)) {
64 db_conn.pool_min = Some(min);
65 }
66 if let Some(max) = parse_env_optional::<u32>(&format!("{}_POOL_MAX", prefix)) {
67 db_conn.pool_max = Some(max);
68 }
69}
70
71#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
74#[serde(rename_all = "lowercase")]
75pub enum AdapterDriver {
76 #[default]
77 Local,
78 Redis,
79 #[serde(rename = "redis-cluster")]
80 RedisCluster,
81 Nats,
82 Pulsar,
83 RabbitMq,
84 #[serde(rename = "google-pubsub")]
85 GooglePubSub,
86 Kafka,
87}
88
89#[derive(Debug, Clone, Serialize, Deserialize)]
90#[serde(default)]
91pub struct DynamoDbSettings {
92 pub region: String,
93 pub table_name: String,
94 pub endpoint_url: Option<String>,
95 pub aws_access_key_id: Option<String>,
96 pub aws_secret_access_key: Option<String>,
97 pub aws_profile_name: Option<String>,
98}
99
100impl Default for DynamoDbSettings {
101 fn default() -> Self {
102 Self {
103 region: "us-east-1".to_string(),
104 table_name: "sockudo-applications".to_string(),
105 endpoint_url: None,
106 aws_access_key_id: None,
107 aws_secret_access_key: None,
108 aws_profile_name: None,
109 }
110 }
111}
112
113#[derive(Debug, Clone, Serialize, Deserialize)]
114#[serde(default)]
115pub struct ScyllaDbSettings {
116 pub nodes: Vec<String>,
117 pub keyspace: String,
118 pub table_name: String,
119 pub username: Option<String>,
120 pub password: Option<String>,
121 pub replication_class: String,
122 pub replication_factor: u32,
123}
124
125impl Default for ScyllaDbSettings {
126 fn default() -> Self {
127 Self {
128 nodes: vec!["127.0.0.1:9042".to_string()],
129 keyspace: "sockudo".to_string(),
130 table_name: "applications".to_string(),
131 username: None,
132 password: None,
133 replication_class: "SimpleStrategy".to_string(),
134 replication_factor: 3,
135 }
136 }
137}
138
139#[derive(Debug, Clone, Serialize, Deserialize)]
140#[serde(default)]
141pub struct SurrealDbSettings {
142 pub url: String,
143 pub namespace: String,
144 pub database: String,
145 pub username: String,
146 pub password: String,
147 pub table_name: String,
148 pub cache_ttl: u64,
149 pub cache_max_capacity: u64,
150}
151
152impl Default for SurrealDbSettings {
153 fn default() -> Self {
154 Self {
155 url: "ws://127.0.0.1:8000".to_string(),
156 namespace: "sockudo".to_string(),
157 database: "sockudo".to_string(),
158 username: "root".to_string(),
159 password: "root".to_string(),
160 table_name: "applications".to_string(),
161 cache_ttl: 300,
162 cache_max_capacity: 100,
163 }
164 }
165}
166
167impl FromStr for AdapterDriver {
168 type Err = String;
169 fn from_str(s: &str) -> Result<Self, Self::Err> {
170 match s.to_lowercase().as_str() {
171 "local" => Ok(AdapterDriver::Local),
172 "redis" => Ok(AdapterDriver::Redis),
173 "redis-cluster" => Ok(AdapterDriver::RedisCluster),
174 "nats" => Ok(AdapterDriver::Nats),
175 "pulsar" => Ok(AdapterDriver::Pulsar),
176 "rabbitmq" | "rabbit-mq" => Ok(AdapterDriver::RabbitMq),
177 "google-pubsub" | "gcp-pubsub" | "pubsub" => Ok(AdapterDriver::GooglePubSub),
178 "kafka" => Ok(AdapterDriver::Kafka),
179 _ => Err(format!("Unknown adapter driver: {s}")),
180 }
181 }
182}
183
184#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
185#[serde(rename_all = "lowercase")]
186pub enum AppManagerDriver {
187 #[default]
188 Memory,
189 Mysql,
190 Dynamodb,
191 PgSql,
192 SurrealDb,
193 ScyllaDb,
194}
195impl FromStr for AppManagerDriver {
196 type Err = String;
197 fn from_str(s: &str) -> Result<Self, Self::Err> {
198 match s.to_lowercase().as_str() {
199 "memory" => Ok(AppManagerDriver::Memory),
200 "mysql" => Ok(AppManagerDriver::Mysql),
201 "dynamodb" => Ok(AppManagerDriver::Dynamodb),
202 "pgsql" | "postgres" | "postgresql" => Ok(AppManagerDriver::PgSql),
203 "surreal" | "surrealdb" => Ok(AppManagerDriver::SurrealDb),
204 "scylladb" | "scylla" => Ok(AppManagerDriver::ScyllaDb),
205 _ => Err(format!("Unknown app manager driver: {s}")),
206 }
207 }
208}
209
210#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
211#[serde(rename_all = "lowercase")]
212pub enum CacheDriver {
213 #[default]
214 Memory,
215 Redis,
216 #[serde(rename = "redis-cluster")]
217 RedisCluster,
218 None,
219}
220
221impl FromStr for CacheDriver {
222 type Err = String;
223 fn from_str(s: &str) -> Result<Self, Self::Err> {
224 match s.to_lowercase().as_str() {
225 "memory" => Ok(CacheDriver::Memory),
226 "redis" => Ok(CacheDriver::Redis),
227 "redis-cluster" => Ok(CacheDriver::RedisCluster),
228 "none" => Ok(CacheDriver::None),
229 _ => Err(format!("Unknown cache driver: {s}")),
230 }
231 }
232}
233
234#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
235#[serde(rename_all = "lowercase")]
236pub enum QueueDriver {
237 Memory,
238 #[default]
239 Redis,
240 #[serde(rename = "redis-cluster")]
241 RedisCluster,
242 Sqs,
243 Sns,
244 None,
245}
246
247impl FromStr for QueueDriver {
248 type Err = String;
249 fn from_str(s: &str) -> Result<Self, Self::Err> {
250 match s.to_lowercase().as_str() {
251 "memory" => Ok(QueueDriver::Memory),
252 "redis" => Ok(QueueDriver::Redis),
253 "redis-cluster" => Ok(QueueDriver::RedisCluster),
254 "sqs" => Ok(QueueDriver::Sqs),
255 "sns" => Ok(QueueDriver::Sns),
256 "none" => Ok(QueueDriver::None),
257 _ => Err(format!("Unknown queue driver: {s}")),
258 }
259 }
260}
261
262impl AsRef<str> for QueueDriver {
263 fn as_ref(&self) -> &str {
264 match self {
265 QueueDriver::Memory => "memory",
266 QueueDriver::Redis => "redis",
267 QueueDriver::RedisCluster => "redis-cluster",
268 QueueDriver::Sqs => "sqs",
269 QueueDriver::Sns => "sns",
270 QueueDriver::None => "none",
271 }
272 }
273}
274
275#[derive(Debug, Clone, Serialize, Deserialize)]
276#[serde(default)]
277pub struct RedisClusterQueueConfig {
278 pub concurrency: u32,
279 pub prefix: Option<String>,
280 pub nodes: Vec<String>,
281 pub request_timeout_ms: u64,
282}
283
284impl Default for RedisClusterQueueConfig {
285 fn default() -> Self {
286 Self {
287 concurrency: 5,
288 prefix: Some("sockudo_queue:".to_string()),
289 nodes: vec!["redis://127.0.0.1:6379".to_string()],
290 request_timeout_ms: 5000,
291 }
292 }
293}
294
295#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
296#[serde(rename_all = "lowercase")]
297pub enum MetricsDriver {
298 #[default]
299 Prometheus,
300}
301
302#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
303#[serde(rename_all = "lowercase")]
304pub enum LogOutputFormat {
305 #[default]
306 Human,
307 Json,
308}
309
310impl FromStr for LogOutputFormat {
311 type Err = String;
312 fn from_str(s: &str) -> Result<Self, Self::Err> {
313 match s.to_lowercase().as_str() {
314 "human" => Ok(LogOutputFormat::Human),
315 "json" => Ok(LogOutputFormat::Json),
316 _ => Err(format!("Unknown log output format: {s}")),
317 }
318 }
319}
320
321impl FromStr for MetricsDriver {
322 type Err = String;
323 fn from_str(s: &str) -> Result<Self, Self::Err> {
324 match s.to_lowercase().as_str() {
325 "prometheus" => Ok(MetricsDriver::Prometheus),
326 _ => Err(format!("Unknown metrics driver: {s}")),
327 }
328 }
329}
330
331impl AsRef<str> for MetricsDriver {
332 fn as_ref(&self) -> &str {
333 match self {
334 MetricsDriver::Prometheus => "prometheus",
335 }
336 }
337}
338
339#[derive(Debug, Clone, Serialize, Deserialize)]
341#[serde(default)]
342pub struct ServerOptions {
343 pub adapter: AdapterConfig,
344 pub app_manager: AppManagerConfig,
345 pub cache: CacheConfig,
346 pub channel_limits: ChannelLimits,
347 pub cors: CorsConfig,
348 pub database: DatabaseConfig,
349 pub database_pooling: DatabasePooling,
350 pub debug: bool,
351 pub event_limits: EventLimits,
352 pub host: String,
353 pub http_api: HttpApiConfig,
354 pub instance: InstanceConfig,
355 pub logging: Option<LoggingConfig>,
356 pub metrics: MetricsConfig,
357 pub mode: String,
358 pub port: u16,
359 pub path_prefix: String,
360 pub presence: PresenceConfig,
361 pub queue: QueueConfig,
362 pub rate_limiter: RateLimiterConfig,
363 pub shutdown_grace_period: u64,
364 pub ssl: SslConfig,
365 pub user_authentication_timeout: u64,
366 pub webhooks: WebhooksConfig,
367 pub websocket_max_payload_kb: u32,
368 pub cleanup: CleanupConfig,
369 pub activity_timeout: u64,
370 pub cluster_health: ClusterHealthConfig,
371 pub unix_socket: UnixSocketConfig,
372 pub delta_compression: DeltaCompressionOptionsConfig,
373 pub tag_filtering: TagFilteringConfig,
374 pub websocket: WebSocketConfig,
375 pub connection_recovery: ConnectionRecoveryConfig,
376 pub idempotency: IdempotencyConfig,
377 pub ephemeral: EphemeralConfig,
378 pub echo_control: EchoControlConfig,
379 pub event_name_filtering: EventNameFilteringConfig,
380}
381
382#[derive(Debug, Clone, Serialize, Deserialize)]
385#[serde(default)]
386pub struct SqsQueueConfig {
387 pub region: String,
388 pub queue_url_prefix: Option<String>,
389 pub visibility_timeout: i32,
390 pub endpoint_url: Option<String>,
391 pub max_messages: i32,
392 pub wait_time_seconds: i32,
393 pub concurrency: u32,
394 pub fifo: bool,
395 pub message_group_id: Option<String>,
396}
397
398#[derive(Debug, Clone, Serialize, Deserialize)]
399#[serde(default)]
400pub struct SnsQueueConfig {
401 pub region: String,
402 pub topic_arn: String,
403 pub endpoint_url: Option<String>,
404}
405
406#[derive(Debug, Clone, Serialize, Deserialize)]
407#[serde(default)]
408pub struct AdapterConfig {
409 pub driver: AdapterDriver,
410 pub redis: RedisAdapterConfig,
411 pub cluster: RedisClusterAdapterConfig,
412 pub nats: NatsAdapterConfig,
413 pub pulsar: PulsarAdapterConfig,
414 pub rabbitmq: RabbitMqAdapterConfig,
415 pub google_pubsub: GooglePubSubAdapterConfig,
416 pub kafka: KafkaAdapterConfig,
417 #[serde(default = "default_buffer_multiplier_per_cpu")]
418 pub buffer_multiplier_per_cpu: usize,
419 pub cluster_health: ClusterHealthConfig,
420 #[serde(default = "default_enable_socket_counting")]
421 pub enable_socket_counting: bool,
422 #[serde(default = "default_fallback_to_local")]
423 pub fallback_to_local: bool,
424}
425
426fn default_enable_socket_counting() -> bool {
427 true
428}
429
430fn default_fallback_to_local() -> bool {
431 true
432}
433
434fn default_buffer_multiplier_per_cpu() -> usize {
435 64
436}
437
438impl Default for AdapterConfig {
439 fn default() -> Self {
440 Self {
441 driver: AdapterDriver::default(),
442 redis: RedisAdapterConfig::default(),
443 cluster: RedisClusterAdapterConfig::default(),
444 nats: NatsAdapterConfig::default(),
445 pulsar: PulsarAdapterConfig::default(),
446 rabbitmq: RabbitMqAdapterConfig::default(),
447 google_pubsub: GooglePubSubAdapterConfig::default(),
448 kafka: KafkaAdapterConfig::default(),
449 buffer_multiplier_per_cpu: default_buffer_multiplier_per_cpu(),
450 cluster_health: ClusterHealthConfig::default(),
451 enable_socket_counting: default_enable_socket_counting(),
452 fallback_to_local: default_fallback_to_local(),
453 }
454 }
455}
456
457#[derive(Debug, Clone, Serialize, Deserialize)]
458#[serde(default)]
459pub struct RedisAdapterConfig {
460 pub requests_timeout: u64,
461 pub prefix: String,
462 pub redis_pub_options: AHashMap<String, sonic_rs::Value>,
463 pub redis_sub_options: AHashMap<String, sonic_rs::Value>,
464 pub cluster_mode: bool,
465}
466
467#[derive(Debug, Clone, Serialize, Deserialize)]
468#[serde(default)]
469pub struct RedisClusterAdapterConfig {
470 pub nodes: Vec<String>,
471 pub prefix: String,
472 pub request_timeout_ms: u64,
473 pub use_connection_manager: bool,
474 #[serde(default)]
475 pub use_sharded_pubsub: bool,
476}
477
478#[derive(Debug, Clone, Serialize, Deserialize)]
479#[serde(default)]
480pub struct NatsAdapterConfig {
481 pub servers: Vec<String>,
482 pub prefix: String,
483 pub request_timeout_ms: u64,
484 pub username: Option<String>,
485 pub password: Option<String>,
486 pub token: Option<String>,
487 pub connection_timeout_ms: u64,
488 pub nodes_number: Option<u32>,
489 pub discovery_max_wait_ms: u64,
490 pub discovery_idle_wait_ms: u64,
491}
492
493#[derive(Debug, Clone, Serialize, Deserialize)]
494#[serde(default)]
495pub struct PulsarAdapterConfig {
496 pub url: String,
497 pub prefix: String,
498 pub request_timeout_ms: u64,
499 pub token: Option<String>,
500 pub nodes_number: Option<u32>,
501}
502
503#[derive(Debug, Clone, Serialize, Deserialize)]
504#[serde(default)]
505pub struct RabbitMqAdapterConfig {
506 pub url: String,
507 pub prefix: String,
508 pub request_timeout_ms: u64,
509 pub connection_timeout_ms: u64,
510 pub nodes_number: Option<u32>,
511}
512
513#[derive(Debug, Clone, Serialize, Deserialize)]
514#[serde(default)]
515pub struct GooglePubSubAdapterConfig {
516 pub project_id: String,
517 pub prefix: String,
518 pub request_timeout_ms: u64,
519 pub emulator_host: Option<String>,
520 pub nodes_number: Option<u32>,
521}
522
523#[derive(Debug, Clone, Serialize, Deserialize)]
524#[serde(default)]
525pub struct KafkaAdapterConfig {
526 pub brokers: Vec<String>,
527 pub prefix: String,
528 pub request_timeout_ms: u64,
529 pub security_protocol: Option<String>,
530 pub sasl_mechanism: Option<String>,
531 pub sasl_username: Option<String>,
532 pub sasl_password: Option<String>,
533 pub nodes_number: Option<u32>,
534}
535
536#[derive(Debug, Clone, Serialize, Deserialize, Default)]
537#[serde(default)]
538pub struct AppManagerConfig {
539 pub driver: AppManagerDriver,
540 pub array: ArrayConfig,
541 pub cache: CacheSettings,
542 pub scylladb: ScyllaDbSettings,
543}
544
545#[derive(Debug, Clone, Serialize, Deserialize, Default)]
546#[serde(default)]
547pub struct ArrayConfig {
548 pub apps: Vec<App>,
549}
550
551#[derive(Debug, Clone, Serialize, Deserialize)]
552#[serde(default)]
553pub struct CacheSettings {
554 pub enabled: bool,
555 pub ttl: u64,
556}
557
558#[derive(Debug, Clone, Serialize, Deserialize)]
559#[serde(default)]
560pub struct MemoryCacheOptions {
561 pub ttl: u64,
562 pub cleanup_interval: u64,
563 pub max_capacity: u64,
564}
565
566#[derive(Debug, Clone, Serialize, Deserialize)]
567#[serde(default)]
568pub struct CacheConfig {
569 pub driver: CacheDriver,
570 pub redis: RedisConfig,
571 pub memory: MemoryCacheOptions,
572}
573
574#[derive(Debug, Clone, Serialize, Deserialize, Default)]
575#[serde(default)]
576pub struct RedisConfig {
577 pub prefix: Option<String>,
578 pub url_override: Option<String>,
579 pub cluster_mode: bool,
580}
581
582#[derive(Debug, Clone, Serialize, Deserialize)]
583#[serde(default)]
584pub struct ChannelLimits {
585 pub max_name_length: u32,
586 pub cache_ttl: u64,
587}
588
589#[derive(Debug, Clone, Serialize, Deserialize)]
590#[serde(default)]
591pub struct CorsConfig {
592 pub credentials: bool,
593 #[serde(deserialize_with = "deserialize_and_validate_cors_origins")]
594 pub origin: Vec<String>,
595 pub methods: Vec<String>,
596 pub allowed_headers: Vec<String>,
597}
598
599fn deserialize_and_validate_cors_origins<'de, D>(deserializer: D) -> Result<Vec<String>, D::Error>
600where
601 D: serde::Deserializer<'de>,
602{
603 use serde::de::Error;
604 let origins = Vec::<String>::deserialize(deserializer)?;
605
606 if let Err(e) = crate::origin_validation::OriginValidator::validate_patterns(&origins) {
607 return Err(D::Error::custom(format!(
608 "CORS origin pattern validation failed: {}",
609 e
610 )));
611 }
612
613 Ok(origins)
614}
615
616#[derive(Debug, Clone, Serialize, Deserialize, Default)]
617#[serde(default)]
618pub struct DatabaseConfig {
619 pub mysql: DatabaseConnection,
620 pub postgres: DatabaseConnection,
621 pub redis: RedisConnection,
622 pub dynamodb: DynamoDbSettings,
623 pub surrealdb: SurrealDbSettings,
624 pub scylladb: ScyllaDbSettings,
625}
626
627#[derive(Debug, Clone, Serialize, Deserialize)]
628#[serde(default)]
629pub struct DatabaseConnection {
630 pub host: String,
631 pub port: u16,
632 pub username: String,
633 pub password: String,
634 pub database: String,
635 pub table_name: String,
636 pub connection_pool_size: u32,
637 pub pool_min: Option<u32>,
638 pub pool_max: Option<u32>,
639 pub cache_ttl: u64,
640 pub cache_cleanup_interval: u64,
641 pub cache_max_capacity: u64,
642}
643
644#[derive(Debug, Clone, Serialize, Deserialize)]
645#[serde(default)]
646pub struct RedisConnection {
647 pub host: String,
648 pub port: u16,
649 pub db: u32,
650 pub username: Option<String>,
651 pub password: Option<String>,
652 pub key_prefix: String,
653 pub sentinels: Vec<RedisSentinel>,
654 pub sentinel_password: Option<String>,
655 pub name: String,
656 pub cluster: RedisClusterConnection,
657 pub cluster_nodes: Vec<ClusterNode>,
659}
660
661#[derive(Debug, Clone, Serialize, Deserialize)]
662#[serde(default)]
663pub struct RedisSentinel {
664 pub host: String,
665 pub port: u16,
666}
667
668#[derive(Debug, Clone, Serialize, Deserialize, Default)]
669#[serde(default)]
670pub struct RedisClusterConnection {
671 pub nodes: Vec<ClusterNode>,
672 pub username: Option<String>,
673 pub password: Option<String>,
674 #[serde(alias = "useTLS")]
675 pub use_tls: bool,
676}
677
678impl RedisConnection {
679 pub fn is_sentinel_configured(&self) -> bool {
681 !self.sentinels.is_empty()
682 }
683
684 pub fn to_url(&self) -> String {
686 if self.is_sentinel_configured() {
687 self.build_sentinel_url()
688 } else {
689 self.build_standard_url()
690 }
691 }
692
693 fn build_standard_url(&self) -> String {
694 let (scheme, host) = if self.host.starts_with("rediss://") {
696 ("rediss://", self.host.trim_start_matches("rediss://"))
697 } else if self.host.starts_with("redis://") {
698 ("redis://", self.host.trim_start_matches("redis://"))
699 } else {
700 ("redis://", self.host.as_str())
701 };
702
703 let mut url = String::from(scheme);
704
705 if let Some(ref username) = self.username {
706 url.push_str(username);
707 if let Some(ref password) = self.password {
708 url.push(':');
709 url.push_str(&urlencoding::encode(password));
710 }
711 url.push('@');
712 } else if let Some(ref password) = self.password {
713 url.push(':');
714 url.push_str(&urlencoding::encode(password));
715 url.push('@');
716 }
717
718 url.push_str(host);
719 url.push(':');
720 url.push_str(&self.port.to_string());
721 url.push('/');
722 url.push_str(&self.db.to_string());
723
724 url
725 }
726
727 fn build_sentinel_url(&self) -> String {
728 let mut url = String::from("redis+sentinel://");
729
730 if let Some(ref sentinel_password) = self.sentinel_password {
731 url.push(':');
732 url.push_str(&urlencoding::encode(sentinel_password));
733 url.push('@');
734 }
735
736 let sentinel_hosts: Vec<String> = self
737 .sentinels
738 .iter()
739 .map(|s| format!("{}:{}", s.host, s.port))
740 .collect();
741 url.push_str(&sentinel_hosts.join(","));
742
743 url.push('/');
744 url.push_str(&self.name);
745 url.push('/');
746 url.push_str(&self.db.to_string());
747
748 let mut params = Vec::new();
749 if let Some(ref password) = self.password {
750 params.push(format!("password={}", urlencoding::encode(password)));
751 }
752 if let Some(ref username) = self.username {
753 params.push(format!("username={}", urlencoding::encode(username)));
754 }
755
756 if !params.is_empty() {
757 url.push('?');
758 url.push_str(¶ms.join("&"));
759 }
760
761 url
762 }
763
764 pub fn has_cluster_nodes(&self) -> bool {
767 !self.cluster.nodes.is_empty() || !self.cluster_nodes.is_empty()
768 }
769
770 pub fn cluster_node_urls(&self) -> Vec<String> {
773 if !self.cluster.nodes.is_empty() {
774 return self.build_cluster_urls(&self.cluster.nodes);
775 }
776 self.build_cluster_urls(&self.cluster_nodes)
777 }
778
779 pub fn normalize_cluster_seed_urls(&self, seeds: &[String]) -> Vec<String> {
782 self.build_cluster_urls(
783 &seeds
784 .iter()
785 .filter_map(|seed| ClusterNode::from_seed(seed))
786 .collect::<Vec<ClusterNode>>(),
787 )
788 }
789
790 fn build_cluster_urls(&self, nodes: &[ClusterNode]) -> Vec<String> {
791 let username = self
792 .cluster
793 .username
794 .as_deref()
795 .or(self.username.as_deref());
796 let password = self
797 .cluster
798 .password
799 .as_deref()
800 .or(self.password.as_deref());
801 let use_tls = self.cluster.use_tls;
802
803 nodes
804 .iter()
805 .map(|node| node.to_url_with_options(use_tls, username, password))
806 .collect()
807 }
808}
809
810impl RedisSentinel {
811 pub fn to_host_port(&self) -> String {
812 format!("{}:{}", self.host, self.port)
813 }
814}
815
816#[derive(Debug, Clone, Serialize, Deserialize)]
817#[serde(default)]
818pub struct ClusterNode {
819 pub host: String,
820 pub port: u16,
821}
822
823impl ClusterNode {
824 pub fn to_url(&self) -> String {
825 self.to_url_with_options(false, None, None)
826 }
827
828 pub fn to_url_with_options(
829 &self,
830 use_tls: bool,
831 username: Option<&str>,
832 password: Option<&str>,
833 ) -> String {
834 let host = self.host.trim();
835
836 if host.starts_with("redis://") || host.starts_with("rediss://") {
837 if let Ok(parsed) = Url::parse(host)
838 && let Some(host_str) = parsed.host_str()
839 {
840 let scheme = parsed.scheme();
841 let port = parsed.port_or_known_default().unwrap_or(self.port);
842 let parsed_username = (!parsed.username().is_empty()).then_some(parsed.username());
843 let parsed_password = parsed.password();
844 let has_embedded_auth = parsed_username.is_some() || parsed_password.is_some();
845 let (effective_username, effective_password) = if has_embedded_auth {
846 (parsed_username, parsed_password)
847 } else {
848 (username, password)
849 };
850
851 return build_redis_url(
852 scheme,
853 host_str,
854 port,
855 effective_username,
856 effective_password,
857 );
858 }
859
860 let has_port = if let Some(bracket_pos) = host.rfind(']') {
862 host[bracket_pos..].contains(':')
863 } else {
864 host.split(':').count() >= 3
865 };
866 let base = if has_port {
867 host.to_string()
868 } else {
869 format!("{}:{}", host, self.port)
870 };
871
872 if let Ok(parsed) = Url::parse(&base) {
873 let parsed_username = (!parsed.username().is_empty()).then_some(parsed.username());
874 let parsed_password = parsed.password();
875 if let Some(host_str) = parsed.host_str() {
876 let port = parsed.port_or_known_default().unwrap_or(self.port);
877 let has_embedded_auth = parsed_username.is_some() || parsed_password.is_some();
878 let (effective_username, effective_password) = if has_embedded_auth {
879 (parsed_username, parsed_password)
880 } else {
881 (username, password)
882 };
883 return build_redis_url(
884 parsed.scheme(),
885 host_str,
886 port,
887 effective_username,
888 effective_password,
889 );
890 }
891 }
892 return base;
893 }
894
895 let (normalized_host, normalized_port) = split_plain_host_and_port(host, self.port);
896 let scheme = if use_tls { "rediss" } else { "redis" };
897 build_redis_url(
898 scheme,
899 &normalized_host,
900 normalized_port,
901 username,
902 password,
903 )
904 }
905
906 pub fn from_seed(seed: &str) -> Option<Self> {
907 let trimmed = seed.trim();
908 if trimmed.is_empty() {
909 return None;
910 }
911
912 if trimmed.starts_with("redis://") || trimmed.starts_with("rediss://") {
913 let port = Url::parse(trimmed)
914 .ok()
915 .and_then(|parsed| parsed.port_or_known_default())
916 .unwrap_or(6379);
917 return Some(Self {
918 host: trimmed.to_string(),
919 port,
920 });
921 }
922
923 let (host, port) = split_plain_host_and_port(trimmed, 6379);
924 Some(Self { host, port })
925 }
926}
927
928fn split_plain_host_and_port(raw_host: &str, default_port: u16) -> (String, u16) {
929 let host = raw_host.trim();
930
931 if host.starts_with('[') {
933 if let Some(end_bracket) = host.find(']') {
934 let host_part = host[1..end_bracket].to_string();
935 let remainder = &host[end_bracket + 1..];
936 if let Some(port_str) = remainder.strip_prefix(':')
937 && let Ok(port) = port_str.parse::<u16>()
938 {
939 return (host_part, port);
940 }
941 return (host_part, default_port);
942 }
943 return (host.to_string(), default_port);
944 }
945
946 if host.matches(':').count() == 1
948 && let Some((host_part, port_part)) = host.rsplit_once(':')
949 && let Ok(port) = port_part.parse::<u16>()
950 {
951 return (host_part.to_string(), port);
952 }
953
954 (host.to_string(), default_port)
955}
956
957fn build_redis_url(
958 scheme: &str,
959 host: &str,
960 port: u16,
961 username: Option<&str>,
962 password: Option<&str>,
963) -> String {
964 let mut url = format!("{scheme}://");
965
966 if let Some(user) = username {
967 url.push_str(&urlencoding::encode(user));
968 if let Some(pass) = password {
969 url.push(':');
970 url.push_str(&urlencoding::encode(pass));
971 }
972 url.push('@');
973 } else if let Some(pass) = password {
974 url.push(':');
975 url.push_str(&urlencoding::encode(pass));
976 url.push('@');
977 }
978
979 if host.contains(':') && !host.starts_with('[') {
980 url.push('[');
981 url.push_str(host);
982 url.push(']');
983 } else {
984 url.push_str(host);
985 }
986 url.push(':');
987 url.push_str(&port.to_string());
988 url
989}
990
991#[derive(Debug, Clone, Serialize, Deserialize)]
992#[serde(default)]
993pub struct DatabasePooling {
994 pub enabled: bool,
995 pub min: u32,
996 pub max: u32,
997}
998
999#[derive(Debug, Clone, Serialize, Deserialize)]
1000#[serde(default)]
1001pub struct EventLimits {
1002 pub max_channels_at_once: u32,
1003 pub max_name_length: u32,
1004 pub max_payload_in_kb: u32,
1005 pub max_batch_size: u32,
1006}
1007
1008#[derive(Debug, Clone, Serialize, Deserialize)]
1009#[serde(default)]
1010pub struct IdempotencyConfig {
1011 pub enabled: bool,
1013 pub ttl_seconds: u64,
1015 pub max_key_length: usize,
1017}
1018
1019#[derive(Debug, Clone, Serialize, Deserialize)]
1020#[serde(default)]
1021pub struct EphemeralConfig {
1022 pub enabled: bool,
1024}
1025
1026#[derive(Debug, Clone, Serialize, Deserialize)]
1027#[serde(default)]
1028pub struct EchoControlConfig {
1029 pub enabled: bool,
1031 pub default_echo_messages: bool,
1033}
1034
1035#[derive(Debug, Clone, Serialize, Deserialize)]
1036#[serde(default)]
1037pub struct EventNameFilteringConfig {
1038 pub enabled: bool,
1040 pub max_events_per_filter: usize,
1042 pub max_event_name_length: usize,
1044}
1045
1046#[derive(Debug, Clone, Serialize, Deserialize)]
1047#[serde(default)]
1048pub struct ConnectionRecoveryConfig {
1049 pub enabled: bool,
1054 pub buffer_ttl_seconds: u64,
1056 pub max_buffer_size: usize,
1058}
1059
1060#[derive(Debug, Clone, Serialize, Deserialize)]
1061#[serde(default)]
1062pub struct HttpApiConfig {
1063 pub request_limit_in_mb: u32,
1064 pub accept_traffic: AcceptTraffic,
1065 pub usage_enabled: bool,
1066}
1067
1068#[derive(Debug, Clone, Serialize, Deserialize)]
1069#[serde(default)]
1070pub struct AcceptTraffic {
1071 pub memory_threshold: f64,
1072}
1073
1074#[derive(Debug, Clone, Serialize, Deserialize)]
1075#[serde(default)]
1076pub struct InstanceConfig {
1077 pub process_id: String,
1078}
1079
1080#[derive(Debug, Clone, Serialize, Deserialize)]
1081#[serde(default)]
1082pub struct MetricsConfig {
1083 pub enabled: bool,
1084 pub driver: MetricsDriver,
1085 pub host: String,
1086 pub prometheus: PrometheusConfig,
1087 pub port: u16,
1088}
1089
1090#[derive(Debug, Clone, Serialize, Deserialize)]
1091#[serde(default)]
1092pub struct PrometheusConfig {
1093 pub prefix: String,
1094}
1095
1096#[derive(Debug, Clone, Serialize, Deserialize)]
1097#[serde(default)]
1098pub struct LoggingConfig {
1099 pub colors_enabled: bool,
1100 pub include_target: bool,
1101}
1102
1103#[derive(Debug, Clone, Serialize, Deserialize)]
1104#[serde(default)]
1105pub struct PresenceConfig {
1106 pub max_members_per_channel: u32,
1107 pub max_member_size_in_kb: u32,
1108}
1109
1110#[derive(Debug, Clone, Serialize, Deserialize)]
1113#[serde(default)]
1114pub struct WebSocketConfig {
1115 pub max_messages: Option<usize>,
1116 pub max_bytes: Option<usize>,
1117 pub disconnect_on_buffer_full: bool,
1118 pub max_message_size: usize,
1119 pub max_frame_size: usize,
1120 pub write_buffer_size: usize,
1121 pub max_backpressure: usize,
1122 pub auto_ping: bool,
1123 pub ping_interval: u32,
1124 pub idle_timeout: u32,
1125 pub compression: String,
1126}
1127
1128impl Default for WebSocketConfig {
1129 fn default() -> Self {
1130 Self {
1131 max_messages: Some(1000),
1132 max_bytes: None,
1133 disconnect_on_buffer_full: true,
1134 max_message_size: 64 * 1024 * 1024,
1135 max_frame_size: 16 * 1024 * 1024,
1136 write_buffer_size: 16 * 1024,
1137 max_backpressure: 1024 * 1024,
1138 auto_ping: true,
1139 ping_interval: 30,
1140 idle_timeout: 120,
1141 compression: "disabled".to_string(),
1142 }
1143 }
1144}
1145
1146impl WebSocketConfig {
1147 pub fn to_buffer_config(&self) -> crate::websocket::WebSocketBufferConfig {
1149 use crate::websocket::{BufferLimit, WebSocketBufferConfig};
1150
1151 let limit = match (self.max_messages, self.max_bytes) {
1152 (Some(messages), Some(bytes)) => BufferLimit::Both { messages, bytes },
1153 (Some(messages), None) => BufferLimit::Messages(messages),
1154 (None, Some(bytes)) => BufferLimit::Bytes(bytes),
1155 (None, None) => BufferLimit::Messages(1000),
1156 };
1157
1158 WebSocketBufferConfig {
1159 limit,
1160 disconnect_on_full: self.disconnect_on_buffer_full,
1161 }
1162 }
1163
1164 pub fn to_sockudo_ws_config(
1166 &self,
1167 websocket_max_payload_kb: u32,
1168 activity_timeout: u64,
1169 ) -> sockudo_ws::Config {
1170 use sockudo_ws::Compression;
1171
1172 let compression = match self.compression.to_lowercase().as_str() {
1173 "dedicated" => Compression::Dedicated,
1174 "shared" => Compression::Shared,
1175 "window256b" => Compression::Window256B,
1176 "window1kb" => Compression::Window1KB,
1177 "window2kb" => Compression::Window2KB,
1178 "window4kb" => Compression::Window4KB,
1179 "window8kb" => Compression::Window8KB,
1180 "window16kb" => Compression::Window16KB,
1181 "window32kb" => Compression::Window32KB,
1182 _ => Compression::Disabled,
1183 };
1184
1185 sockudo_ws::Config::builder()
1186 .max_payload_length(
1187 self.max_bytes
1188 .unwrap_or(websocket_max_payload_kb as usize * 1024),
1189 )
1190 .max_message_size(self.max_message_size)
1191 .max_frame_size(self.max_frame_size)
1192 .write_buffer_size(self.write_buffer_size)
1193 .max_backpressure(self.max_backpressure)
1194 .idle_timeout(self.idle_timeout)
1195 .auto_ping(self.auto_ping)
1196 .ping_interval(self.ping_interval.max((activity_timeout / 2).max(5) as u32))
1197 .compression(compression)
1198 .build()
1199 }
1200}
1201
1202#[derive(Debug, Clone, Serialize, Deserialize, Default)]
1203#[serde(default)]
1204pub struct QueueConfig {
1205 pub driver: QueueDriver,
1206 pub redis: RedisQueueConfig,
1207 pub redis_cluster: RedisClusterQueueConfig,
1208 pub sqs: SqsQueueConfig,
1209 pub sns: SnsQueueConfig,
1210}
1211
1212#[derive(Debug, Clone, Serialize, Deserialize)]
1213#[serde(default)]
1214pub struct RedisQueueConfig {
1215 pub concurrency: u32,
1216 pub prefix: Option<String>,
1217 pub url_override: Option<String>,
1218 pub cluster_mode: bool,
1219}
1220
1221#[derive(Clone, Debug, Serialize, Deserialize)]
1222#[serde(default)]
1223pub struct RateLimit {
1224 pub max_requests: u32,
1225 pub window_seconds: u64,
1226 pub identifier: Option<String>,
1227 pub trust_hops: Option<u32>,
1228}
1229
1230#[derive(Debug, Clone, Serialize, Deserialize)]
1231#[serde(default)]
1232pub struct RateLimiterConfig {
1233 pub enabled: bool,
1234 pub driver: CacheDriver,
1235 pub api_rate_limit: RateLimit,
1236 pub websocket_rate_limit: RateLimit,
1237 pub redis: RedisConfig,
1238}
1239
1240#[derive(Debug, Clone, Serialize, Deserialize)]
1241#[serde(default)]
1242pub struct SslConfig {
1243 pub enabled: bool,
1244 pub cert_path: String,
1245 pub key_path: String,
1246 pub passphrase: Option<String>,
1247 pub ca_path: Option<String>,
1248 pub redirect_http: bool,
1249 pub http_port: Option<u16>,
1250}
1251
1252#[derive(Debug, Clone, Serialize, Deserialize)]
1253#[serde(default)]
1254pub struct WebhooksConfig {
1255 pub batching: BatchingConfig,
1256 pub retry: WebhookRetryConfig,
1257 pub request_timeout_ms: u64,
1258}
1259
1260#[derive(Debug, Clone, Serialize, Deserialize)]
1261#[serde(default)]
1262pub struct BatchingConfig {
1263 pub enabled: bool,
1264 pub duration: u64,
1265 pub size: usize,
1266}
1267
1268#[derive(Debug, Clone, Serialize, Deserialize)]
1269#[serde(default)]
1270pub struct WebhookRetryConfig {
1271 pub enabled: bool,
1272 pub max_attempts: Option<u32>,
1273 pub max_elapsed_time_ms: u64,
1274 pub initial_backoff_ms: u64,
1275 pub max_backoff_ms: u64,
1276}
1277
1278impl Default for WebhooksConfig {
1279 fn default() -> Self {
1280 Self {
1281 batching: BatchingConfig::default(),
1282 retry: WebhookRetryConfig::default(),
1283 request_timeout_ms: 10_000,
1284 }
1285 }
1286}
1287
1288impl Default for WebhookRetryConfig {
1289 fn default() -> Self {
1290 Self {
1291 enabled: true,
1292 max_attempts: None,
1293 max_elapsed_time_ms: 300_000,
1294 initial_backoff_ms: 1_000,
1295 max_backoff_ms: 60_000,
1296 }
1297 }
1298}
1299
1300#[derive(Debug, Clone, Serialize, Deserialize)]
1301#[serde(default)]
1302pub struct ClusterHealthConfig {
1303 pub enabled: bool,
1304 pub heartbeat_interval_ms: u64,
1305 pub node_timeout_ms: u64,
1306 pub cleanup_interval_ms: u64,
1307}
1308
1309#[derive(Debug, Clone, Serialize, Deserialize)]
1310#[serde(default)]
1311pub struct UnixSocketConfig {
1312 pub enabled: bool,
1313 pub path: String,
1314 #[serde(deserialize_with = "deserialize_octal_permission")]
1315 pub permission_mode: u32,
1316}
1317
1318#[derive(Debug, Clone, Serialize, Deserialize)]
1319#[serde(default)]
1320pub struct DeltaCompressionOptionsConfig {
1321 pub enabled: bool,
1322 pub algorithm: String,
1323 pub full_message_interval: u32,
1324 pub min_message_size: usize,
1325 pub max_state_age_secs: u64,
1326 pub max_channel_states_per_socket: usize,
1327 pub max_conflation_states_per_channel: Option<usize>,
1328 pub conflation_key_path: Option<String>,
1329 pub cluster_coordination: bool,
1330 pub omit_delta_algorithm: bool,
1331}
1332
1333#[derive(Debug, Clone, Serialize, Deserialize)]
1335#[serde(default)]
1336pub struct CleanupConfig {
1337 pub queue_buffer_size: usize,
1338 pub batch_size: usize,
1339 pub batch_timeout_ms: u64,
1340 pub worker_threads: WorkerThreadsConfig,
1341 pub max_retry_attempts: u32,
1342 pub async_enabled: bool,
1343 pub fallback_to_sync: bool,
1344}
1345
1346#[derive(Debug, Clone)]
1348pub enum WorkerThreadsConfig {
1349 Auto,
1350 Fixed(usize),
1351}
1352
1353impl serde::Serialize for WorkerThreadsConfig {
1354 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
1355 where
1356 S: serde::Serializer,
1357 {
1358 match self {
1359 WorkerThreadsConfig::Auto => serializer.serialize_str("auto"),
1360 WorkerThreadsConfig::Fixed(n) => serializer.serialize_u64(*n as u64),
1361 }
1362 }
1363}
1364
1365impl<'de> serde::Deserialize<'de> for WorkerThreadsConfig {
1366 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
1367 where
1368 D: serde::Deserializer<'de>,
1369 {
1370 use serde::de;
1371 struct WorkerThreadsVisitor;
1372 impl<'de> de::Visitor<'de> for WorkerThreadsVisitor {
1373 type Value = WorkerThreadsConfig;
1374
1375 fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
1376 formatter.write_str(r#""auto" or a positive integer"#)
1377 }
1378
1379 fn visit_str<E: de::Error>(self, value: &str) -> Result<Self::Value, E> {
1380 if value.eq_ignore_ascii_case("auto") {
1381 Ok(WorkerThreadsConfig::Auto)
1382 } else if let Ok(n) = value.parse::<usize>() {
1383 Ok(WorkerThreadsConfig::Fixed(n))
1384 } else {
1385 Err(E::custom(format!(
1386 "expected 'auto' or a number, got '{value}'"
1387 )))
1388 }
1389 }
1390
1391 fn visit_u64<E: de::Error>(self, value: u64) -> Result<Self::Value, E> {
1392 Ok(WorkerThreadsConfig::Fixed(value as usize))
1393 }
1394
1395 fn visit_i64<E: de::Error>(self, value: i64) -> Result<Self::Value, E> {
1396 if value >= 0 {
1397 Ok(WorkerThreadsConfig::Fixed(value as usize))
1398 } else {
1399 Err(E::custom("worker_threads must be non-negative"))
1400 }
1401 }
1402 }
1403 deserializer.deserialize_any(WorkerThreadsVisitor)
1404 }
1405}
1406
1407impl Default for CleanupConfig {
1408 fn default() -> Self {
1409 Self {
1410 queue_buffer_size: 1024,
1411 batch_size: 64,
1412 batch_timeout_ms: 100,
1413 worker_threads: WorkerThreadsConfig::Auto,
1414 max_retry_attempts: 3,
1415 async_enabled: true,
1416 fallback_to_sync: true,
1417 }
1418 }
1419}
1420
1421impl CleanupConfig {
1422 pub fn validate(&self) -> Result<(), String> {
1423 if self.queue_buffer_size == 0 {
1424 return Err("queue_buffer_size must be greater than 0".to_string());
1425 }
1426 if self.batch_size == 0 {
1427 return Err("batch_size must be greater than 0".to_string());
1428 }
1429 if self.batch_timeout_ms == 0 {
1430 return Err("batch_timeout_ms must be greater than 0".to_string());
1431 }
1432 if let WorkerThreadsConfig::Fixed(n) = self.worker_threads
1433 && n == 0
1434 {
1435 return Err("worker_threads must be greater than 0 when using fixed count".to_string());
1436 }
1437 Ok(())
1438 }
1439}
1440
1441#[derive(Debug, Clone, Serialize, Deserialize, Default)]
1442#[serde(default)]
1443pub struct TagFilteringConfig {
1444 #[serde(default)]
1445 pub enabled: bool,
1446 #[serde(default = "default_true")]
1447 pub enable_tags: bool,
1448}
1449
1450fn default_true() -> bool {
1451 true
1452}
1453
1454impl Default for ServerOptions {
1457 fn default() -> Self {
1458 Self {
1459 adapter: AdapterConfig::default(),
1460 app_manager: AppManagerConfig::default(),
1461 cache: CacheConfig::default(),
1462 channel_limits: ChannelLimits::default(),
1463 cors: CorsConfig::default(),
1464 database: DatabaseConfig::default(),
1465 database_pooling: DatabasePooling::default(),
1466 debug: false,
1467 tag_filtering: TagFilteringConfig::default(),
1468 event_limits: EventLimits::default(),
1469 host: "0.0.0.0".to_string(),
1470 http_api: HttpApiConfig::default(),
1471 instance: InstanceConfig::default(),
1472 logging: None,
1473 metrics: MetricsConfig::default(),
1474 mode: "production".to_string(),
1475 port: 6001,
1476 path_prefix: "/".to_string(),
1477 presence: PresenceConfig::default(),
1478 queue: QueueConfig::default(),
1479 rate_limiter: RateLimiterConfig::default(),
1480 shutdown_grace_period: 10,
1481 ssl: SslConfig::default(),
1482 user_authentication_timeout: 3600,
1483 webhooks: WebhooksConfig::default(),
1484 websocket_max_payload_kb: 64,
1485 cleanup: CleanupConfig::default(),
1486 activity_timeout: 120,
1487 cluster_health: ClusterHealthConfig::default(),
1488 unix_socket: UnixSocketConfig::default(),
1489 delta_compression: DeltaCompressionOptionsConfig::default(),
1490 websocket: WebSocketConfig::default(),
1491 connection_recovery: ConnectionRecoveryConfig::default(),
1492 idempotency: IdempotencyConfig::default(),
1493 ephemeral: EphemeralConfig::default(),
1494 echo_control: EchoControlConfig::default(),
1495 event_name_filtering: EventNameFilteringConfig::default(),
1496 }
1497 }
1498}
1499
1500impl Default for SqsQueueConfig {
1501 fn default() -> Self {
1502 Self {
1503 region: "us-east-1".to_string(),
1504 queue_url_prefix: None,
1505 visibility_timeout: 30,
1506 endpoint_url: None,
1507 max_messages: 10,
1508 wait_time_seconds: 5,
1509 concurrency: 5,
1510 fifo: false,
1511 message_group_id: Some("default".to_string()),
1512 }
1513 }
1514}
1515
1516impl Default for SnsQueueConfig {
1517 fn default() -> Self {
1518 Self {
1519 region: "us-east-1".to_string(),
1520 topic_arn: String::new(),
1521 endpoint_url: None,
1522 }
1523 }
1524}
1525
1526impl Default for RedisAdapterConfig {
1527 fn default() -> Self {
1528 Self {
1529 requests_timeout: 5000,
1530 prefix: "sockudo_adapter:".to_string(),
1531 redis_pub_options: AHashMap::new(),
1532 redis_sub_options: AHashMap::new(),
1533 cluster_mode: false,
1534 }
1535 }
1536}
1537
1538impl Default for RedisClusterAdapterConfig {
1539 fn default() -> Self {
1540 Self {
1541 nodes: vec![],
1542 prefix: "sockudo_adapter:".to_string(),
1543 request_timeout_ms: 1000,
1544 use_connection_manager: true,
1545 use_sharded_pubsub: false,
1546 }
1547 }
1548}
1549
1550impl Default for NatsAdapterConfig {
1551 fn default() -> Self {
1552 Self {
1553 servers: vec!["nats://localhost:4222".to_string()],
1554 prefix: "sockudo_adapter:".to_string(),
1555 request_timeout_ms: 5000,
1556 username: None,
1557 password: None,
1558 token: None,
1559 connection_timeout_ms: 5000,
1560 nodes_number: None,
1561 discovery_max_wait_ms: 1000,
1562 discovery_idle_wait_ms: 150,
1563 }
1564 }
1565}
1566
1567impl Default for PulsarAdapterConfig {
1568 fn default() -> Self {
1569 Self {
1570 url: "pulsar://127.0.0.1:6650".to_string(),
1571 prefix: "sockudo-adapter".to_string(),
1572 request_timeout_ms: 5000,
1573 token: None,
1574 nodes_number: None,
1575 }
1576 }
1577}
1578
1579impl Default for RabbitMqAdapterConfig {
1580 fn default() -> Self {
1581 Self {
1582 url: "amqp://guest:guest@127.0.0.1:5672/%2f".to_string(),
1583 prefix: "sockudo_adapter".to_string(),
1584 request_timeout_ms: 5000,
1585 connection_timeout_ms: 5000,
1586 nodes_number: None,
1587 }
1588 }
1589}
1590
1591impl Default for GooglePubSubAdapterConfig {
1592 fn default() -> Self {
1593 Self {
1594 project_id: "".to_string(),
1595 prefix: "sockudo-adapter".to_string(),
1596 request_timeout_ms: 5000,
1597 emulator_host: None,
1598 nodes_number: None,
1599 }
1600 }
1601}
1602
1603impl Default for KafkaAdapterConfig {
1604 fn default() -> Self {
1605 Self {
1606 brokers: vec!["localhost:9092".to_string()],
1607 prefix: "sockudo_adapter".to_string(),
1608 request_timeout_ms: 5000,
1609 security_protocol: None,
1610 sasl_mechanism: None,
1611 sasl_username: None,
1612 sasl_password: None,
1613 nodes_number: None,
1614 }
1615 }
1616}
1617
1618impl Default for CacheSettings {
1619 fn default() -> Self {
1620 Self {
1621 enabled: true,
1622 ttl: 300,
1623 }
1624 }
1625}
1626
1627impl Default for MemoryCacheOptions {
1628 fn default() -> Self {
1629 Self {
1630 ttl: 300,
1631 cleanup_interval: 60,
1632 max_capacity: 10000,
1633 }
1634 }
1635}
1636
1637impl Default for CacheConfig {
1638 fn default() -> Self {
1639 Self {
1640 driver: CacheDriver::default(),
1641 redis: RedisConfig {
1642 prefix: Some("sockudo_cache:".to_string()),
1643 url_override: None,
1644 cluster_mode: false,
1645 },
1646 memory: MemoryCacheOptions::default(),
1647 }
1648 }
1649}
1650
1651impl Default for ChannelLimits {
1652 fn default() -> Self {
1653 Self {
1654 max_name_length: 200,
1655 cache_ttl: 3600,
1656 }
1657 }
1658}
1659impl Default for CorsConfig {
1660 fn default() -> Self {
1661 Self {
1662 credentials: true,
1663 origin: vec!["*".to_string()],
1664 methods: vec!["GET".to_string(), "POST".to_string(), "OPTIONS".to_string()],
1665 allowed_headers: vec![
1666 "Authorization".to_string(),
1667 "Content-Type".to_string(),
1668 "X-Requested-With".to_string(),
1669 "Accept".to_string(),
1670 ],
1671 }
1672 }
1673}
1674
1675impl Default for DatabaseConnection {
1676 fn default() -> Self {
1677 Self {
1678 host: "localhost".to_string(),
1679 port: 3306,
1680 username: "root".to_string(),
1681 password: "".to_string(),
1682 database: "sockudo".to_string(),
1683 table_name: "applications".to_string(),
1684 connection_pool_size: 10,
1685 pool_min: None,
1686 pool_max: None,
1687 cache_ttl: 300,
1688 cache_cleanup_interval: 60,
1689 cache_max_capacity: 100,
1690 }
1691 }
1692}
1693
1694impl Default for RedisConnection {
1695 fn default() -> Self {
1696 Self {
1697 host: "127.0.0.1".to_string(),
1698 port: 6379,
1699 db: 0,
1700 username: None,
1701 password: None,
1702 key_prefix: "sockudo:".to_string(),
1703 sentinels: Vec::new(),
1704 sentinel_password: None,
1705 name: "mymaster".to_string(),
1706 cluster: RedisClusterConnection::default(),
1707 cluster_nodes: Vec::new(),
1708 }
1709 }
1710}
1711
1712impl Default for RedisSentinel {
1713 fn default() -> Self {
1714 Self {
1715 host: "localhost".to_string(),
1716 port: 26379,
1717 }
1718 }
1719}
1720
1721impl Default for ClusterNode {
1722 fn default() -> Self {
1723 Self {
1724 host: "127.0.0.1".to_string(),
1725 port: 7000,
1726 }
1727 }
1728}
1729
1730impl Default for DatabasePooling {
1731 fn default() -> Self {
1732 Self {
1733 enabled: true,
1734 min: 2,
1735 max: 10,
1736 }
1737 }
1738}
1739
1740impl Default for EventLimits {
1741 fn default() -> Self {
1742 Self {
1743 max_channels_at_once: 100,
1744 max_name_length: 200,
1745 max_payload_in_kb: 100,
1746 max_batch_size: 10,
1747 }
1748 }
1749}
1750
1751impl Default for IdempotencyConfig {
1752 fn default() -> Self {
1753 Self {
1754 enabled: true,
1755 ttl_seconds: 120,
1756 max_key_length: 128,
1757 }
1758 }
1759}
1760
1761impl Default for EphemeralConfig {
1762 fn default() -> Self {
1763 Self { enabled: true }
1764 }
1765}
1766
1767impl Default for EchoControlConfig {
1768 fn default() -> Self {
1769 Self {
1770 enabled: true,
1771 default_echo_messages: true,
1772 }
1773 }
1774}
1775
1776impl Default for EventNameFilteringConfig {
1777 fn default() -> Self {
1778 Self {
1779 enabled: true,
1780 max_events_per_filter: 50,
1781 max_event_name_length: 200,
1782 }
1783 }
1784}
1785
1786impl Default for ConnectionRecoveryConfig {
1787 fn default() -> Self {
1788 Self {
1789 enabled: false,
1790 buffer_ttl_seconds: 120,
1791 max_buffer_size: 100,
1792 }
1793 }
1794}
1795
1796impl Default for HttpApiConfig {
1797 fn default() -> Self {
1798 Self {
1799 request_limit_in_mb: 10,
1800 accept_traffic: AcceptTraffic::default(),
1801 usage_enabled: true,
1802 }
1803 }
1804}
1805
1806impl Default for AcceptTraffic {
1807 fn default() -> Self {
1808 Self {
1809 memory_threshold: 0.90,
1810 }
1811 }
1812}
1813
1814impl Default for InstanceConfig {
1815 fn default() -> Self {
1816 Self {
1817 process_id: uuid::Uuid::new_v4().to_string(),
1818 }
1819 }
1820}
1821
1822impl Default for LoggingConfig {
1823 fn default() -> Self {
1824 Self {
1825 colors_enabled: true,
1826 include_target: true,
1827 }
1828 }
1829}
1830
1831impl Default for MetricsConfig {
1832 fn default() -> Self {
1833 Self {
1834 enabled: true,
1835 driver: MetricsDriver::default(),
1836 host: "0.0.0.0".to_string(),
1837 prometheus: PrometheusConfig::default(),
1838 port: 9601,
1839 }
1840 }
1841}
1842
1843impl Default for PrometheusConfig {
1844 fn default() -> Self {
1845 Self {
1846 prefix: "sockudo_".to_string(),
1847 }
1848 }
1849}
1850
1851impl Default for PresenceConfig {
1852 fn default() -> Self {
1853 Self {
1854 max_members_per_channel: 100,
1855 max_member_size_in_kb: 2,
1856 }
1857 }
1858}
1859
1860impl Default for RedisQueueConfig {
1861 fn default() -> Self {
1862 Self {
1863 concurrency: 5,
1864 prefix: Some("sockudo_queue:".to_string()),
1865 url_override: None,
1866 cluster_mode: false,
1867 }
1868 }
1869}
1870
1871impl Default for RateLimit {
1872 fn default() -> Self {
1873 Self {
1874 max_requests: 60,
1875 window_seconds: 60,
1876 identifier: Some("default".to_string()),
1877 trust_hops: Some(0),
1878 }
1879 }
1880}
1881
1882impl Default for RateLimiterConfig {
1883 fn default() -> Self {
1884 Self {
1885 enabled: true,
1886 driver: CacheDriver::Memory,
1887 api_rate_limit: RateLimit {
1888 max_requests: 100,
1889 window_seconds: 60,
1890 identifier: Some("api".to_string()),
1891 trust_hops: Some(0),
1892 },
1893 websocket_rate_limit: RateLimit {
1894 max_requests: 20,
1895 window_seconds: 60,
1896 identifier: Some("websocket_connect".to_string()),
1897 trust_hops: Some(0),
1898 },
1899 redis: RedisConfig {
1900 prefix: Some("sockudo_rl:".to_string()),
1901 url_override: None,
1902 cluster_mode: false,
1903 },
1904 }
1905 }
1906}
1907
1908impl Default for SslConfig {
1909 fn default() -> Self {
1910 Self {
1911 enabled: false,
1912 cert_path: "".to_string(),
1913 key_path: "".to_string(),
1914 passphrase: None,
1915 ca_path: None,
1916 redirect_http: false,
1917 http_port: Some(80),
1918 }
1919 }
1920}
1921
1922impl Default for BatchingConfig {
1923 fn default() -> Self {
1924 Self {
1925 enabled: true,
1926 duration: 50,
1927 size: 100,
1928 }
1929 }
1930}
1931
1932impl Default for ClusterHealthConfig {
1933 fn default() -> Self {
1934 Self {
1935 enabled: true,
1936 heartbeat_interval_ms: 10000,
1937 node_timeout_ms: 30000,
1938 cleanup_interval_ms: 10000,
1939 }
1940 }
1941}
1942
1943impl Default for UnixSocketConfig {
1944 fn default() -> Self {
1945 Self {
1946 enabled: false,
1947 path: "/var/run/sockudo/sockudo.sock".to_string(),
1948 permission_mode: 0o660,
1949 }
1950 }
1951}
1952
1953impl Default for DeltaCompressionOptionsConfig {
1954 fn default() -> Self {
1955 Self {
1956 enabled: true,
1957 algorithm: "fossil".to_string(),
1958 full_message_interval: 10,
1959 min_message_size: 100,
1960 max_state_age_secs: 300,
1961 max_channel_states_per_socket: 100,
1962 max_conflation_states_per_channel: Some(100),
1963 conflation_key_path: None,
1964 cluster_coordination: false,
1965 omit_delta_algorithm: false,
1966 }
1967 }
1968}
1969
1970impl ClusterHealthConfig {
1971 pub fn validate(&self) -> Result<(), String> {
1972 if self.heartbeat_interval_ms == 0 {
1973 return Err("heartbeat_interval_ms must be greater than 0".to_string());
1974 }
1975 if self.node_timeout_ms == 0 {
1976 return Err("node_timeout_ms must be greater than 0".to_string());
1977 }
1978 if self.cleanup_interval_ms == 0 {
1979 return Err("cleanup_interval_ms must be greater than 0".to_string());
1980 }
1981
1982 if self.heartbeat_interval_ms > self.node_timeout_ms / 3 {
1983 return Err(format!(
1984 "heartbeat_interval_ms ({}) should be at least 3x smaller than node_timeout_ms ({}) to avoid false positive dead node detection. Recommended: heartbeat_interval_ms <= {}",
1985 self.heartbeat_interval_ms,
1986 self.node_timeout_ms,
1987 self.node_timeout_ms / 3
1988 ));
1989 }
1990
1991 if self.cleanup_interval_ms > self.node_timeout_ms {
1992 return Err(format!(
1993 "cleanup_interval_ms ({}) should not be larger than node_timeout_ms ({}) to ensure timely dead node detection",
1994 self.cleanup_interval_ms, self.node_timeout_ms
1995 ));
1996 }
1997
1998 Ok(())
1999 }
2000}
2001
2002impl ServerOptions {
2003 pub async fn load_from_file(path: &str) -> Result<Self, Box<dyn std::error::Error>> {
2004 let content = tokio::fs::read_to_string(path).await?;
2005 let options: Self = if path.ends_with(".toml") {
2006 toml::from_str(&content)?
2007 } else {
2008 sonic_rs::from_str(&content)?
2010 };
2011 Ok(options)
2012 }
2013
2014 pub async fn override_from_env(&mut self) -> Result<(), Box<dyn std::error::Error>> {
2015 if let Ok(mode) = std::env::var("ENVIRONMENT") {
2017 self.mode = mode;
2018 }
2019 self.debug = parse_bool_env("DEBUG_MODE", self.debug);
2020 if parse_bool_env("DEBUG", false) {
2021 self.debug = true;
2022 info!("DEBUG environment variable forces debug mode ON");
2023 }
2024
2025 self.activity_timeout = parse_env::<u64>("ACTIVITY_TIMEOUT", self.activity_timeout);
2026
2027 if let Ok(host) = std::env::var("HOST") {
2028 self.host = host;
2029 }
2030 self.port = parse_env::<u16>("PORT", self.port);
2031 self.shutdown_grace_period =
2032 parse_env::<u64>("SHUTDOWN_GRACE_PERIOD", self.shutdown_grace_period);
2033 self.user_authentication_timeout = parse_env::<u64>(
2034 "USER_AUTHENTICATION_TIMEOUT",
2035 self.user_authentication_timeout,
2036 );
2037 self.websocket_max_payload_kb =
2038 parse_env::<u32>("WEBSOCKET_MAX_PAYLOAD_KB", self.websocket_max_payload_kb);
2039 if let Ok(id) = std::env::var("INSTANCE_PROCESS_ID") {
2040 self.instance.process_id = id;
2041 }
2042
2043 if let Ok(driver_str) = std::env::var("ADAPTER_DRIVER") {
2045 self.adapter.driver =
2046 parse_driver_enum(driver_str, self.adapter.driver.clone(), "Adapter");
2047 }
2048 self.adapter.buffer_multiplier_per_cpu = parse_env::<usize>(
2049 "ADAPTER_BUFFER_MULTIPLIER_PER_CPU",
2050 self.adapter.buffer_multiplier_per_cpu,
2051 );
2052 self.adapter.enable_socket_counting = parse_env::<bool>(
2053 "ADAPTER_ENABLE_SOCKET_COUNTING",
2054 self.adapter.enable_socket_counting,
2055 );
2056 self.adapter.fallback_to_local =
2057 parse_env::<bool>("ADAPTER_FALLBACK_TO_LOCAL", self.adapter.fallback_to_local);
2058 if let Ok(driver_str) = std::env::var("CACHE_DRIVER") {
2059 self.cache.driver = parse_driver_enum(driver_str, self.cache.driver.clone(), "Cache");
2060 }
2061 if let Ok(driver_str) = std::env::var("QUEUE_DRIVER") {
2062 self.queue.driver = parse_driver_enum(driver_str, self.queue.driver.clone(), "Queue");
2063 }
2064 if let Ok(driver_str) = std::env::var("APP_MANAGER_DRIVER") {
2065 self.app_manager.driver =
2066 parse_driver_enum(driver_str, self.app_manager.driver.clone(), "AppManager");
2067 }
2068 if let Ok(driver_str) = std::env::var("RATE_LIMITER_DRIVER") {
2069 self.rate_limiter.driver = parse_driver_enum(
2070 driver_str,
2071 self.rate_limiter.driver.clone(),
2072 "RateLimiter Backend",
2073 );
2074 }
2075
2076 if let Ok(host) = std::env::var("DATABASE_REDIS_HOST") {
2078 self.database.redis.host = host;
2079 }
2080 self.database.redis.port =
2081 parse_env::<u16>("DATABASE_REDIS_PORT", self.database.redis.port);
2082 if let Ok(username) = std::env::var("DATABASE_REDIS_USERNAME") {
2083 self.database.redis.username = if username.is_empty() {
2084 None
2085 } else {
2086 Some(username)
2087 };
2088 }
2089 if let Ok(password) = std::env::var("DATABASE_REDIS_PASSWORD") {
2090 self.database.redis.password = Some(password);
2091 }
2092 self.database.redis.db = parse_env::<u32>("DATABASE_REDIS_DB", self.database.redis.db);
2093 if let Ok(prefix) = std::env::var("DATABASE_REDIS_KEY_PREFIX") {
2094 self.database.redis.key_prefix = prefix;
2095 }
2096 if let Ok(cluster_username) = std::env::var("DATABASE_REDIS_CLUSTER_USERNAME") {
2097 self.database.redis.cluster.username = if cluster_username.is_empty() {
2098 None
2099 } else {
2100 Some(cluster_username)
2101 };
2102 }
2103 if let Ok(cluster_password) = std::env::var("DATABASE_REDIS_CLUSTER_PASSWORD") {
2104 self.database.redis.cluster.password = Some(cluster_password);
2105 }
2106 self.database.redis.cluster.use_tls = parse_bool_env(
2107 "DATABASE_REDIS_CLUSTER_USE_TLS",
2108 self.database.redis.cluster.use_tls,
2109 );
2110
2111 if let Ok(host) = std::env::var("DATABASE_MYSQL_HOST") {
2113 self.database.mysql.host = host;
2114 }
2115 self.database.mysql.port =
2116 parse_env::<u16>("DATABASE_MYSQL_PORT", self.database.mysql.port);
2117 if let Ok(user) = std::env::var("DATABASE_MYSQL_USERNAME") {
2118 self.database.mysql.username = user;
2119 }
2120 if let Ok(pass) = std::env::var("DATABASE_MYSQL_PASSWORD") {
2121 self.database.mysql.password = pass;
2122 }
2123 if let Ok(db) = std::env::var("DATABASE_MYSQL_DATABASE") {
2124 self.database.mysql.database = db;
2125 }
2126 if let Ok(table) = std::env::var("DATABASE_MYSQL_TABLE_NAME") {
2127 self.database.mysql.table_name = table;
2128 }
2129 override_db_pool_settings(&mut self.database.mysql, "DATABASE_MYSQL");
2130
2131 if let Ok(host) = std::env::var("DATABASE_POSTGRES_HOST") {
2133 self.database.postgres.host = host;
2134 }
2135 self.database.postgres.port =
2136 parse_env::<u16>("DATABASE_POSTGRES_PORT", self.database.postgres.port);
2137 if let Ok(user) = std::env::var("DATABASE_POSTGRES_USERNAME") {
2138 self.database.postgres.username = user;
2139 }
2140 if let Ok(pass) = std::env::var("DATABASE_POSTGRES_PASSWORD") {
2141 self.database.postgres.password = pass;
2142 }
2143 if let Ok(db) = std::env::var("DATABASE_POSTGRES_DATABASE") {
2144 self.database.postgres.database = db;
2145 }
2146 override_db_pool_settings(&mut self.database.postgres, "DATABASE_POSTGRES");
2147
2148 if let Ok(region) = std::env::var("DATABASE_DYNAMODB_REGION") {
2150 self.database.dynamodb.region = region;
2151 }
2152 if let Ok(table) = std::env::var("DATABASE_DYNAMODB_TABLE_NAME") {
2153 self.database.dynamodb.table_name = table;
2154 }
2155 if let Ok(endpoint) = std::env::var("DATABASE_DYNAMODB_ENDPOINT_URL") {
2156 self.database.dynamodb.endpoint_url = Some(endpoint);
2157 }
2158 if let Ok(key_id) = std::env::var("AWS_ACCESS_KEY_ID") {
2159 self.database.dynamodb.aws_access_key_id = Some(key_id);
2160 }
2161 if let Ok(secret) = std::env::var("AWS_SECRET_ACCESS_KEY") {
2162 self.database.dynamodb.aws_secret_access_key = Some(secret);
2163 }
2164
2165 if let Ok(url) = std::env::var("DATABASE_SURREALDB_URL") {
2167 self.database.surrealdb.url = url;
2168 }
2169 if let Ok(namespace) = std::env::var("DATABASE_SURREALDB_NAMESPACE") {
2170 self.database.surrealdb.namespace = namespace;
2171 }
2172 if let Ok(database) = std::env::var("DATABASE_SURREALDB_DATABASE") {
2173 self.database.surrealdb.database = database;
2174 }
2175 if let Ok(username) = std::env::var("DATABASE_SURREALDB_USERNAME") {
2176 self.database.surrealdb.username = username;
2177 }
2178 if let Ok(password) = std::env::var("DATABASE_SURREALDB_PASSWORD") {
2179 self.database.surrealdb.password = password;
2180 }
2181 if let Ok(table) = std::env::var("DATABASE_SURREALDB_TABLE_NAME") {
2182 self.database.surrealdb.table_name = table;
2183 }
2184
2185 let apply_redis_cluster_nodes = |options: &mut Self, nodes: &str| {
2187 let node_list: Vec<String> = nodes
2188 .split(',')
2189 .map(|s| s.trim())
2190 .filter(|s| !s.is_empty())
2191 .map(ToString::to_string)
2192 .collect();
2193
2194 options.adapter.cluster.nodes = node_list.clone();
2195 options.queue.redis_cluster.nodes = node_list.clone();
2196
2197 let parsed_nodes: Vec<ClusterNode> = node_list
2198 .iter()
2199 .filter_map(|seed| ClusterNode::from_seed(seed))
2200 .collect();
2201 options.database.redis.cluster.nodes = parsed_nodes.clone();
2202 options.database.redis.cluster_nodes = parsed_nodes;
2203 };
2204
2205 if let Ok(nodes) = std::env::var("REDIS_CLUSTER_NODES") {
2206 apply_redis_cluster_nodes(self, &nodes);
2207 }
2208 if let Ok(nodes) = std::env::var("DATABASE_REDIS_CLUSTER_NODES") {
2209 apply_redis_cluster_nodes(self, &nodes);
2210 }
2211 self.queue.redis_cluster.concurrency = parse_env::<u32>(
2212 "REDIS_CLUSTER_QUEUE_CONCURRENCY",
2213 self.queue.redis_cluster.concurrency,
2214 );
2215 if let Ok(prefix) = std::env::var("REDIS_CLUSTER_QUEUE_PREFIX") {
2216 self.queue.redis_cluster.prefix = Some(prefix);
2217 }
2218
2219 self.ssl.enabled = parse_bool_env("SSL_ENABLED", self.ssl.enabled);
2221 if let Ok(val) = std::env::var("SSL_CERT_PATH") {
2222 self.ssl.cert_path = val;
2223 }
2224 if let Ok(val) = std::env::var("SSL_KEY_PATH") {
2225 self.ssl.key_path = val;
2226 }
2227 self.ssl.redirect_http = parse_bool_env("SSL_REDIRECT_HTTP", self.ssl.redirect_http);
2228 if let Some(port) = parse_env_optional::<u16>("SSL_HTTP_PORT") {
2229 self.ssl.http_port = Some(port);
2230 }
2231
2232 self.unix_socket.enabled = parse_bool_env("UNIX_SOCKET_ENABLED", self.unix_socket.enabled);
2234 if let Ok(path) = std::env::var("UNIX_SOCKET_PATH") {
2235 self.unix_socket.path = path;
2236 }
2237 if let Ok(mode_str) = std::env::var("UNIX_SOCKET_PERMISSION_MODE") {
2238 if mode_str.chars().all(|c| c.is_digit(8)) {
2239 if let Ok(mode) = u32::from_str_radix(&mode_str, 8) {
2240 if mode <= 0o777 {
2241 self.unix_socket.permission_mode = mode;
2242 } else {
2243 warn!(
2244 "UNIX_SOCKET_PERMISSION_MODE '{}' exceeds maximum value 777. Using default: {:o}",
2245 mode_str, self.unix_socket.permission_mode
2246 );
2247 }
2248 } else {
2249 warn!(
2250 "Failed to parse UNIX_SOCKET_PERMISSION_MODE '{}' as octal. Using default: {:o}",
2251 mode_str, self.unix_socket.permission_mode
2252 );
2253 }
2254 } else {
2255 warn!(
2256 "UNIX_SOCKET_PERMISSION_MODE '{}' must contain only octal digits (0-7). Using default: {:o}",
2257 mode_str, self.unix_socket.permission_mode
2258 );
2259 }
2260 }
2261
2262 if let Ok(driver_str) = std::env::var("METRICS_DRIVER") {
2264 self.metrics.driver =
2265 parse_driver_enum(driver_str, self.metrics.driver.clone(), "Metrics");
2266 }
2267 self.metrics.enabled = parse_bool_env("METRICS_ENABLED", self.metrics.enabled);
2268 if let Ok(val) = std::env::var("METRICS_HOST") {
2269 self.metrics.host = val;
2270 }
2271 self.metrics.port = parse_env::<u16>("METRICS_PORT", self.metrics.port);
2272 if let Ok(val) = std::env::var("METRICS_PROMETHEUS_PREFIX") {
2273 self.metrics.prometheus.prefix = val;
2274 }
2275
2276 self.http_api.usage_enabled =
2278 parse_bool_env("HTTP_API_USAGE_ENABLED", self.http_api.usage_enabled);
2279
2280 self.rate_limiter.enabled =
2282 parse_bool_env("RATE_LIMITER_ENABLED", self.rate_limiter.enabled);
2283 self.rate_limiter.api_rate_limit.max_requests = parse_env::<u32>(
2284 "RATE_LIMITER_API_MAX_REQUESTS",
2285 self.rate_limiter.api_rate_limit.max_requests,
2286 );
2287 self.rate_limiter.api_rate_limit.window_seconds = parse_env::<u64>(
2288 "RATE_LIMITER_API_WINDOW_SECONDS",
2289 self.rate_limiter.api_rate_limit.window_seconds,
2290 );
2291 if let Some(hops) = parse_env_optional::<u32>("RATE_LIMITER_API_TRUST_HOPS") {
2292 self.rate_limiter.api_rate_limit.trust_hops = Some(hops);
2293 }
2294 self.rate_limiter.websocket_rate_limit.max_requests = parse_env::<u32>(
2295 "RATE_LIMITER_WS_MAX_REQUESTS",
2296 self.rate_limiter.websocket_rate_limit.max_requests,
2297 );
2298 self.rate_limiter.websocket_rate_limit.window_seconds = parse_env::<u64>(
2299 "RATE_LIMITER_WS_WINDOW_SECONDS",
2300 self.rate_limiter.websocket_rate_limit.window_seconds,
2301 );
2302 if let Ok(prefix) = std::env::var("RATE_LIMITER_REDIS_PREFIX") {
2303 self.rate_limiter.redis.prefix = Some(prefix);
2304 }
2305
2306 self.queue.redis.concurrency =
2308 parse_env::<u32>("QUEUE_REDIS_CONCURRENCY", self.queue.redis.concurrency);
2309 if let Ok(prefix) = std::env::var("QUEUE_REDIS_PREFIX") {
2310 self.queue.redis.prefix = Some(prefix);
2311 }
2312
2313 if let Ok(region) = std::env::var("QUEUE_SQS_REGION") {
2315 self.queue.sqs.region = region;
2316 }
2317 self.queue.sqs.visibility_timeout = parse_env::<i32>(
2318 "QUEUE_SQS_VISIBILITY_TIMEOUT",
2319 self.queue.sqs.visibility_timeout,
2320 );
2321 self.queue.sqs.max_messages =
2322 parse_env::<i32>("QUEUE_SQS_MAX_MESSAGES", self.queue.sqs.max_messages);
2323 self.queue.sqs.wait_time_seconds = parse_env::<i32>(
2324 "QUEUE_SQS_WAIT_TIME_SECONDS",
2325 self.queue.sqs.wait_time_seconds,
2326 );
2327 self.queue.sqs.concurrency =
2328 parse_env::<u32>("QUEUE_SQS_CONCURRENCY", self.queue.sqs.concurrency);
2329 self.queue.sqs.fifo = parse_bool_env("QUEUE_SQS_FIFO", self.queue.sqs.fifo);
2330 if let Ok(endpoint) = std::env::var("QUEUE_SQS_ENDPOINT_URL") {
2331 self.queue.sqs.endpoint_url = Some(endpoint);
2332 }
2333
2334 if let Ok(region) = std::env::var("QUEUE_SNS_REGION") {
2336 self.queue.sns.region = region;
2337 }
2338 if let Ok(topic_arn) = std::env::var("QUEUE_SNS_TOPIC_ARN") {
2339 self.queue.sns.topic_arn = topic_arn;
2340 }
2341 if let Ok(endpoint) = std::env::var("QUEUE_SNS_ENDPOINT_URL") {
2342 self.queue.sns.endpoint_url = Some(endpoint);
2343 }
2344
2345 self.webhooks.batching.enabled =
2347 parse_bool_env("WEBHOOK_BATCHING_ENABLED", self.webhooks.batching.enabled);
2348 self.webhooks.batching.duration =
2349 parse_env::<u64>("WEBHOOK_BATCHING_DURATION", self.webhooks.batching.duration);
2350 self.webhooks.batching.size =
2351 parse_env::<usize>("WEBHOOK_BATCHING_SIZE", self.webhooks.batching.size);
2352
2353 if let Ok(servers) = std::env::var("NATS_SERVERS") {
2355 self.adapter.nats.servers = servers.split(',').map(|s| s.trim().to_string()).collect();
2356 }
2357 if let Ok(user) = std::env::var("NATS_USERNAME") {
2358 self.adapter.nats.username = Some(user);
2359 }
2360 if let Ok(pass) = std::env::var("NATS_PASSWORD") {
2361 self.adapter.nats.password = Some(pass);
2362 }
2363 if let Ok(token) = std::env::var("NATS_TOKEN") {
2364 self.adapter.nats.token = Some(token);
2365 }
2366 if let Ok(prefix) = std::env::var("NATS_PREFIX") {
2367 self.adapter.nats.prefix = prefix;
2368 }
2369 self.adapter.nats.connection_timeout_ms = parse_env::<u64>(
2370 "NATS_CONNECTION_TIMEOUT_MS",
2371 self.adapter.nats.connection_timeout_ms,
2372 );
2373 self.adapter.nats.request_timeout_ms = parse_env::<u64>(
2374 "NATS_REQUEST_TIMEOUT_MS",
2375 self.adapter.nats.request_timeout_ms,
2376 );
2377 self.adapter.nats.discovery_max_wait_ms = parse_env::<u64>(
2378 "NATS_DISCOVERY_MAX_WAIT_MS",
2379 self.adapter.nats.discovery_max_wait_ms,
2380 );
2381 self.adapter.nats.discovery_idle_wait_ms = parse_env::<u64>(
2382 "NATS_DISCOVERY_IDLE_WAIT_MS",
2383 self.adapter.nats.discovery_idle_wait_ms,
2384 );
2385 if let Some(nodes) = parse_env_optional::<u32>("NATS_NODES_NUMBER") {
2386 self.adapter.nats.nodes_number = Some(nodes);
2387 }
2388
2389 if let Ok(url) = std::env::var("PULSAR_URL") {
2391 self.adapter.pulsar.url = url;
2392 }
2393 if let Ok(prefix) = std::env::var("PULSAR_PREFIX") {
2394 self.adapter.pulsar.prefix = prefix;
2395 }
2396 if let Ok(token) = std::env::var("PULSAR_TOKEN") {
2397 self.adapter.pulsar.token = Some(token);
2398 }
2399 self.adapter.pulsar.request_timeout_ms = parse_env::<u64>(
2400 "PULSAR_REQUEST_TIMEOUT_MS",
2401 self.adapter.pulsar.request_timeout_ms,
2402 );
2403 if let Some(nodes) = parse_env_optional::<u32>("PULSAR_NODES_NUMBER") {
2404 self.adapter.pulsar.nodes_number = Some(nodes);
2405 }
2406
2407 if let Ok(url) = std::env::var("RABBITMQ_URL") {
2409 self.adapter.rabbitmq.url = url;
2410 }
2411 if let Ok(prefix) = std::env::var("RABBITMQ_PREFIX") {
2412 self.adapter.rabbitmq.prefix = prefix;
2413 }
2414 self.adapter.rabbitmq.connection_timeout_ms = parse_env::<u64>(
2415 "RABBITMQ_CONNECTION_TIMEOUT_MS",
2416 self.adapter.rabbitmq.connection_timeout_ms,
2417 );
2418 self.adapter.rabbitmq.request_timeout_ms = parse_env::<u64>(
2419 "RABBITMQ_REQUEST_TIMEOUT_MS",
2420 self.adapter.rabbitmq.request_timeout_ms,
2421 );
2422 if let Some(nodes) = parse_env_optional::<u32>("RABBITMQ_NODES_NUMBER") {
2423 self.adapter.rabbitmq.nodes_number = Some(nodes);
2424 }
2425
2426 if let Ok(project_id) = std::env::var("GOOGLE_PUBSUB_PROJECT_ID") {
2428 self.adapter.google_pubsub.project_id = project_id;
2429 }
2430 if let Ok(prefix) = std::env::var("GOOGLE_PUBSUB_PREFIX") {
2431 self.adapter.google_pubsub.prefix = prefix;
2432 }
2433 if let Ok(emulator_host) = std::env::var("PUBSUB_EMULATOR_HOST") {
2434 self.adapter.google_pubsub.emulator_host = Some(emulator_host);
2435 }
2436 self.adapter.google_pubsub.request_timeout_ms = parse_env::<u64>(
2437 "GOOGLE_PUBSUB_REQUEST_TIMEOUT_MS",
2438 self.adapter.google_pubsub.request_timeout_ms,
2439 );
2440 if let Some(nodes) = parse_env_optional::<u32>("GOOGLE_PUBSUB_NODES_NUMBER") {
2441 self.adapter.google_pubsub.nodes_number = Some(nodes);
2442 }
2443
2444 if let Ok(brokers) = std::env::var("KAFKA_BROKERS") {
2446 self.adapter.kafka.brokers = brokers.split(',').map(|s| s.trim().to_string()).collect();
2447 }
2448 if let Ok(prefix) = std::env::var("KAFKA_PREFIX") {
2449 self.adapter.kafka.prefix = prefix;
2450 }
2451 if let Ok(protocol) = std::env::var("KAFKA_SECURITY_PROTOCOL") {
2452 self.adapter.kafka.security_protocol = Some(protocol);
2453 }
2454 if let Ok(mechanism) = std::env::var("KAFKA_SASL_MECHANISM") {
2455 self.adapter.kafka.sasl_mechanism = Some(mechanism);
2456 }
2457 if let Ok(username) = std::env::var("KAFKA_SASL_USERNAME") {
2458 self.adapter.kafka.sasl_username = Some(username);
2459 }
2460 if let Ok(password) = std::env::var("KAFKA_SASL_PASSWORD") {
2461 self.adapter.kafka.sasl_password = Some(password);
2462 }
2463 self.adapter.kafka.request_timeout_ms = parse_env::<u64>(
2464 "KAFKA_REQUEST_TIMEOUT_MS",
2465 self.adapter.kafka.request_timeout_ms,
2466 );
2467 if let Some(nodes) = parse_env_optional::<u32>("KAFKA_NODES_NUMBER") {
2468 self.adapter.kafka.nodes_number = Some(nodes);
2469 }
2470
2471 if let Ok(origins) = std::env::var("CORS_ORIGINS") {
2473 let parsed: Vec<String> = origins.split(',').map(|s| s.trim().to_string()).collect();
2474 if let Err(e) = crate::origin_validation::OriginValidator::validate_patterns(&parsed) {
2475 warn!(
2476 "CORS_ORIGINS contains invalid patterns: {}. Keeping previous CORS origins.",
2477 e
2478 );
2479 } else {
2480 self.cors.origin = parsed;
2481 }
2482 }
2483 if let Ok(methods) = std::env::var("CORS_METHODS") {
2484 self.cors.methods = methods.split(',').map(|s| s.trim().to_string()).collect();
2485 }
2486 if let Ok(headers) = std::env::var("CORS_HEADERS") {
2487 self.cors.allowed_headers = headers.split(',').map(|s| s.trim().to_string()).collect();
2488 }
2489 self.cors.credentials = parse_bool_env("CORS_CREDENTIALS", self.cors.credentials);
2490
2491 self.database_pooling.enabled =
2493 parse_bool_env("DATABASE_POOLING_ENABLED", self.database_pooling.enabled);
2494 if let Some(min) = parse_env_optional::<u32>("DATABASE_POOL_MIN") {
2495 self.database_pooling.min = min;
2496 }
2497 if let Some(max) = parse_env_optional::<u32>("DATABASE_POOL_MAX") {
2498 self.database_pooling.max = max;
2499 }
2500
2501 if let Some(pool_size) = parse_env_optional::<u32>("DATABASE_CONNECTION_POOL_SIZE") {
2502 self.database.mysql.connection_pool_size = pool_size;
2503 self.database.postgres.connection_pool_size = pool_size;
2504 }
2505 if let Some(cache_ttl) = parse_env_optional::<u64>("CACHE_TTL_SECONDS") {
2506 self.app_manager.cache.ttl = cache_ttl;
2507 self.channel_limits.cache_ttl = cache_ttl;
2508 self.database.mysql.cache_ttl = cache_ttl;
2509 self.database.postgres.cache_ttl = cache_ttl;
2510 self.database.surrealdb.cache_ttl = cache_ttl;
2511 self.cache.memory.ttl = cache_ttl;
2512 }
2513 if let Some(cleanup_interval) = parse_env_optional::<u64>("CACHE_CLEANUP_INTERVAL") {
2514 self.database.mysql.cache_cleanup_interval = cleanup_interval;
2515 self.database.postgres.cache_cleanup_interval = cleanup_interval;
2516 self.cache.memory.cleanup_interval = cleanup_interval;
2517 }
2518 if let Some(max_capacity) = parse_env_optional::<u64>("CACHE_MAX_CAPACITY") {
2519 self.database.mysql.cache_max_capacity = max_capacity;
2520 self.database.postgres.cache_max_capacity = max_capacity;
2521 self.database.surrealdb.cache_max_capacity = max_capacity;
2522 self.cache.memory.max_capacity = max_capacity;
2523 }
2524
2525 let default_app_id = std::env::var("SOCKUDO_DEFAULT_APP_ID");
2526 let default_app_key = std::env::var("SOCKUDO_DEFAULT_APP_KEY");
2527 let default_app_secret = std::env::var("SOCKUDO_DEFAULT_APP_SECRET");
2528 let default_app_enabled = parse_bool_env("SOCKUDO_DEFAULT_APP_ENABLED", true);
2529
2530 if let (Ok(app_id), Ok(app_key), Ok(app_secret)) =
2531 (default_app_id, default_app_key, default_app_secret)
2532 && default_app_enabled
2533 {
2534 let default_app = App::from_policy(
2535 app_id,
2536 app_key,
2537 app_secret,
2538 default_app_enabled,
2539 crate::app::AppPolicy {
2540 limits: crate::app::AppLimitsPolicy {
2541 max_connections: parse_env::<u32>(
2542 "SOCKUDO_DEFAULT_APP_MAX_CONNECTIONS",
2543 100,
2544 ),
2545 max_backend_events_per_second: Some(parse_env::<u32>(
2546 "SOCKUDO_DEFAULT_APP_MAX_BACKEND_EVENTS_PER_SECOND",
2547 100,
2548 )),
2549 max_client_events_per_second: parse_env::<u32>(
2550 "SOCKUDO_DEFAULT_APP_MAX_CLIENT_EVENTS_PER_SECOND",
2551 100,
2552 ),
2553 max_read_requests_per_second: Some(parse_env::<u32>(
2554 "SOCKUDO_DEFAULT_APP_MAX_READ_REQUESTS_PER_SECOND",
2555 100,
2556 )),
2557 max_presence_members_per_channel: Some(parse_env::<u32>(
2558 "SOCKUDO_DEFAULT_APP_MAX_PRESENCE_MEMBERS_PER_CHANNEL",
2559 100,
2560 )),
2561 max_presence_member_size_in_kb: Some(parse_env::<u32>(
2562 "SOCKUDO_DEFAULT_APP_MAX_PRESENCE_MEMBER_SIZE_IN_KB",
2563 100,
2564 )),
2565 max_channel_name_length: Some(parse_env::<u32>(
2566 "SOCKUDO_DEFAULT_APP_MAX_CHANNEL_NAME_LENGTH",
2567 100,
2568 )),
2569 max_event_channels_at_once: Some(parse_env::<u32>(
2570 "SOCKUDO_DEFAULT_APP_MAX_EVENT_CHANNELS_AT_ONCE",
2571 100,
2572 )),
2573 max_event_name_length: Some(parse_env::<u32>(
2574 "SOCKUDO_DEFAULT_APP_MAX_EVENT_NAME_LENGTH",
2575 100,
2576 )),
2577 max_event_payload_in_kb: Some(parse_env::<u32>(
2578 "SOCKUDO_DEFAULT_APP_MAX_EVENT_PAYLOAD_IN_KB",
2579 100,
2580 )),
2581 max_event_batch_size: Some(parse_env::<u32>(
2582 "SOCKUDO_DEFAULT_APP_MAX_EVENT_BATCH_SIZE",
2583 100,
2584 )),
2585 decay_seconds: None,
2586 terminate_on_limit: false,
2587 message_rate_limit: None,
2588 },
2589 features: crate::app::AppFeaturesPolicy {
2590 enable_client_messages: std::env::var(
2591 "SOCKUDO_DEFAULT_APP_ENABLE_CLIENT_MESSAGES",
2592 )
2593 .ok()
2594 .map(|value| {
2595 matches!(
2596 value.trim().to_ascii_lowercase().as_str(),
2597 "true" | "1" | "yes" | "on"
2598 )
2599 })
2600 .unwrap_or_else(|| parse_bool_env("SOCKUDO_ENABLE_CLIENT_MESSAGES", false)),
2601 enable_user_authentication: Some(parse_bool_env(
2602 "SOCKUDO_DEFAULT_APP_ENABLE_USER_AUTHENTICATION",
2603 false,
2604 )),
2605 enable_watchlist_events: Some(parse_bool_env(
2606 "SOCKUDO_DEFAULT_APP_ENABLE_WATCHLIST_EVENTS",
2607 false,
2608 )),
2609 },
2610 channels: crate::app::AppChannelsPolicy {
2611 allowed_origins: {
2612 if let Ok(origins_str) =
2613 std::env::var("SOCKUDO_DEFAULT_APP_ALLOWED_ORIGINS")
2614 {
2615 if !origins_str.is_empty() {
2616 Some(
2617 origins_str
2618 .split(',')
2619 .map(|s| s.trim().to_string())
2620 .collect(),
2621 )
2622 } else {
2623 None
2624 }
2625 } else {
2626 None
2627 }
2628 },
2629 channel_delta_compression: None,
2630 channel_namespaces: None,
2631 },
2632 webhooks: None,
2633 idempotency: None,
2634 connection_recovery: None,
2635 },
2636 );
2637
2638 self.app_manager.array.apps.push(default_app);
2639 info!("Successfully registered default app from env");
2640 }
2641
2642 if let Ok(redis_url_env) = std::env::var("REDIS_URL") {
2644 info!("Applying REDIS_URL environment variable override");
2645
2646 let redis_url_json = sonic_rs::json!(redis_url_env);
2647
2648 self.adapter
2649 .redis
2650 .redis_pub_options
2651 .insert("url".to_string(), redis_url_json.clone());
2652 self.adapter
2653 .redis
2654 .redis_sub_options
2655 .insert("url".to_string(), redis_url_json);
2656
2657 self.cache.redis.url_override = Some(redis_url_env.clone());
2658 self.queue.redis.url_override = Some(redis_url_env.clone());
2659 self.rate_limiter.redis.url_override = Some(redis_url_env);
2660 }
2661
2662 let has_colors_env = std::env::var("LOG_COLORS_ENABLED").is_ok();
2664 let has_target_env = std::env::var("LOG_INCLUDE_TARGET").is_ok();
2665 if has_colors_env || has_target_env {
2666 let logging_config = self.logging.get_or_insert_with(Default::default);
2667 if has_colors_env {
2668 logging_config.colors_enabled =
2669 parse_bool_env("LOG_COLORS_ENABLED", logging_config.colors_enabled);
2670 }
2671 if has_target_env {
2672 logging_config.include_target =
2673 parse_bool_env("LOG_INCLUDE_TARGET", logging_config.include_target);
2674 }
2675 }
2676
2677 self.cleanup.async_enabled =
2679 parse_bool_env("CLEANUP_ASYNC_ENABLED", self.cleanup.async_enabled);
2680 self.cleanup.fallback_to_sync =
2681 parse_bool_env("CLEANUP_FALLBACK_TO_SYNC", self.cleanup.fallback_to_sync);
2682 self.cleanup.queue_buffer_size =
2683 parse_env::<usize>("CLEANUP_QUEUE_BUFFER_SIZE", self.cleanup.queue_buffer_size);
2684 self.cleanup.batch_size = parse_env::<usize>("CLEANUP_BATCH_SIZE", self.cleanup.batch_size);
2685 self.cleanup.batch_timeout_ms =
2686 parse_env::<u64>("CLEANUP_BATCH_TIMEOUT_MS", self.cleanup.batch_timeout_ms);
2687 if let Ok(worker_threads_str) = std::env::var("CLEANUP_WORKER_THREADS") {
2688 self.cleanup.worker_threads = if worker_threads_str.to_lowercase() == "auto" {
2689 WorkerThreadsConfig::Auto
2690 } else if let Ok(n) = worker_threads_str.parse::<usize>() {
2691 WorkerThreadsConfig::Fixed(n)
2692 } else {
2693 warn!(
2694 "Invalid CLEANUP_WORKER_THREADS value '{}', keeping current setting",
2695 worker_threads_str
2696 );
2697 self.cleanup.worker_threads.clone()
2698 };
2699 }
2700 self.cleanup.max_retry_attempts = parse_env::<u32>(
2701 "CLEANUP_MAX_RETRY_ATTEMPTS",
2702 self.cleanup.max_retry_attempts,
2703 );
2704
2705 self.cluster_health.enabled =
2707 parse_bool_env("CLUSTER_HEALTH_ENABLED", self.cluster_health.enabled);
2708 self.cluster_health.heartbeat_interval_ms = parse_env::<u64>(
2709 "CLUSTER_HEALTH_HEARTBEAT_INTERVAL",
2710 self.cluster_health.heartbeat_interval_ms,
2711 );
2712 self.cluster_health.node_timeout_ms = parse_env::<u64>(
2713 "CLUSTER_HEALTH_NODE_TIMEOUT",
2714 self.cluster_health.node_timeout_ms,
2715 );
2716 self.cluster_health.cleanup_interval_ms = parse_env::<u64>(
2717 "CLUSTER_HEALTH_CLEANUP_INTERVAL",
2718 self.cluster_health.cleanup_interval_ms,
2719 );
2720
2721 self.tag_filtering.enabled =
2723 parse_bool_env("TAG_FILTERING_ENABLED", self.tag_filtering.enabled);
2724
2725 if let Ok(val) = std::env::var("WEBSOCKET_MAX_MESSAGES") {
2727 if val.to_lowercase() == "none" || val == "0" {
2728 self.websocket.max_messages = None;
2729 } else if let Ok(n) = val.parse::<usize>() {
2730 self.websocket.max_messages = Some(n);
2731 }
2732 }
2733 if let Ok(val) = std::env::var("WEBSOCKET_MAX_BYTES") {
2734 if val.to_lowercase() == "none" || val == "0" {
2735 self.websocket.max_bytes = None;
2736 } else if let Ok(n) = val.parse::<usize>() {
2737 self.websocket.max_bytes = Some(n);
2738 }
2739 }
2740 self.websocket.disconnect_on_buffer_full = parse_bool_env(
2741 "WEBSOCKET_DISCONNECT_ON_BUFFER_FULL",
2742 self.websocket.disconnect_on_buffer_full,
2743 );
2744 self.websocket.max_message_size = parse_env::<usize>(
2745 "WEBSOCKET_MAX_MESSAGE_SIZE",
2746 self.websocket.max_message_size,
2747 );
2748 self.websocket.max_frame_size =
2749 parse_env::<usize>("WEBSOCKET_MAX_FRAME_SIZE", self.websocket.max_frame_size);
2750 self.websocket.write_buffer_size = parse_env::<usize>(
2751 "WEBSOCKET_WRITE_BUFFER_SIZE",
2752 self.websocket.write_buffer_size,
2753 );
2754 self.websocket.max_backpressure = parse_env::<usize>(
2755 "WEBSOCKET_MAX_BACKPRESSURE",
2756 self.websocket.max_backpressure,
2757 );
2758 self.websocket.auto_ping = parse_bool_env("WEBSOCKET_AUTO_PING", self.websocket.auto_ping);
2759 self.websocket.ping_interval =
2760 parse_env::<u32>("WEBSOCKET_PING_INTERVAL", self.websocket.ping_interval);
2761 self.websocket.idle_timeout =
2762 parse_env::<u32>("WEBSOCKET_IDLE_TIMEOUT", self.websocket.idle_timeout);
2763 if let Ok(mode) = std::env::var("WEBSOCKET_COMPRESSION") {
2764 self.websocket.compression = mode;
2765 }
2766
2767 self.connection_recovery.enabled = parse_bool_env(
2769 "CONNECTION_RECOVERY_ENABLED",
2770 self.connection_recovery.enabled,
2771 );
2772 self.connection_recovery.buffer_ttl_seconds = parse_env::<u64>(
2773 "CONNECTION_RECOVERY_BUFFER_TTL",
2774 self.connection_recovery.buffer_ttl_seconds,
2775 );
2776 self.connection_recovery.max_buffer_size = parse_env::<usize>(
2777 "CONNECTION_RECOVERY_MAX_BUFFER_SIZE",
2778 self.connection_recovery.max_buffer_size,
2779 );
2780
2781 self.idempotency.enabled = parse_bool_env("IDEMPOTENCY_ENABLED", self.idempotency.enabled);
2782 self.idempotency.ttl_seconds =
2783 parse_env::<u64>("IDEMPOTENCY_TTL_SECONDS", self.idempotency.ttl_seconds);
2784 self.idempotency.max_key_length = parse_env::<usize>(
2785 "IDEMPOTENCY_MAX_KEY_LENGTH",
2786 self.idempotency.max_key_length,
2787 );
2788
2789 self.ephemeral.enabled = parse_bool_env("EPHEMERAL_ENABLED", self.ephemeral.enabled);
2790
2791 self.echo_control.enabled =
2792 parse_bool_env("ECHO_CONTROL_ENABLED", self.echo_control.enabled);
2793 self.echo_control.default_echo_messages = parse_bool_env(
2794 "ECHO_CONTROL_DEFAULT_ECHO_MESSAGES",
2795 self.echo_control.default_echo_messages,
2796 );
2797
2798 self.event_name_filtering.enabled = parse_bool_env(
2799 "EVENT_NAME_FILTERING_ENABLED",
2800 self.event_name_filtering.enabled,
2801 );
2802 self.event_name_filtering.max_events_per_filter = parse_env::<usize>(
2803 "EVENT_NAME_FILTERING_MAX_EVENTS_PER_FILTER",
2804 self.event_name_filtering.max_events_per_filter,
2805 );
2806 self.event_name_filtering.max_event_name_length = parse_env::<usize>(
2807 "EVENT_NAME_FILTERING_MAX_EVENT_NAME_LENGTH",
2808 self.event_name_filtering.max_event_name_length,
2809 );
2810
2811 Ok(())
2812 }
2813
2814 pub fn validate(&self) -> Result<(), String> {
2815 if self.unix_socket.enabled {
2816 if self.unix_socket.path.is_empty() {
2817 return Err(
2818 "Unix socket path cannot be empty when Unix socket is enabled".to_string(),
2819 );
2820 }
2821
2822 self.validate_unix_socket_security()?;
2823
2824 if self.ssl.enabled {
2825 tracing::warn!(
2826 "Both Unix socket and SSL are enabled. This is unusual as Unix sockets are typically used behind reverse proxies that handle SSL termination."
2827 );
2828 }
2829
2830 if self.unix_socket.permission_mode > 0o777 {
2831 return Err(format!(
2832 "Unix socket permission_mode ({:o}) is invalid. Must be a valid octal mode (0o000 to 0o777)",
2833 self.unix_socket.permission_mode
2834 ));
2835 }
2836 }
2837
2838 if let Err(e) = self.cleanup.validate() {
2839 return Err(format!("Invalid cleanup configuration: {}", e));
2840 }
2841
2842 Ok(())
2843 }
2844
2845 fn validate_unix_socket_security(&self) -> Result<(), String> {
2846 let path = &self.unix_socket.path;
2847
2848 if path.contains("../") || path.contains("..\\") {
2849 return Err(
2850 "Unix socket path contains directory traversal sequences (../). This is not allowed for security reasons.".to_string()
2851 );
2852 }
2853
2854 if self.unix_socket.permission_mode & 0o002 != 0 {
2855 warn!(
2856 "Unix socket permission mode ({:o}) allows world write access. This may be a security risk. Consider using more restrictive permissions like 0o660 or 0o750.",
2857 self.unix_socket.permission_mode
2858 );
2859 }
2860
2861 if self.unix_socket.permission_mode & 0o007 > 0o005 {
2862 warn!(
2863 "Unix socket permission mode ({:o}) grants write permissions to others. Consider using more restrictive permissions.",
2864 self.unix_socket.permission_mode
2865 );
2866 }
2867
2868 if self.mode == "production" && path.starts_with("/tmp/") {
2869 warn!(
2870 "Unix socket path '{}' is in /tmp directory. In production, consider using a more permanent location like /var/run/sockudo/ for better security and persistence.",
2871 path
2872 );
2873 }
2874
2875 if !path.starts_with('/') {
2876 return Err(
2877 "Unix socket path must be absolute (start with /) for security and reliability."
2878 .to_string(),
2879 );
2880 }
2881
2882 Ok(())
2883 }
2884}
2885
2886#[cfg(test)]
2887mod redis_connection_tests {
2888 use super::{ClusterNode, RedisClusterConnection, RedisConnection, RedisSentinel};
2889
2890 #[test]
2891 fn test_standard_url_basic() {
2892 let conn = RedisConnection {
2893 host: "127.0.0.1".to_string(),
2894 port: 6379,
2895 db: 0,
2896 username: None,
2897 password: None,
2898 key_prefix: "sockudo:".to_string(),
2899 sentinels: Vec::new(),
2900 sentinel_password: None,
2901 name: "mymaster".to_string(),
2902 cluster: RedisClusterConnection::default(),
2903 cluster_nodes: Vec::new(),
2904 };
2905 assert_eq!(conn.to_url(), "redis://127.0.0.1:6379/0");
2906 }
2907
2908 #[test]
2909 fn test_standard_url_with_password() {
2910 let conn = RedisConnection {
2911 host: "127.0.0.1".to_string(),
2912 port: 6379,
2913 db: 2,
2914 username: None,
2915 password: Some("secret".to_string()),
2916 key_prefix: "sockudo:".to_string(),
2917 sentinels: Vec::new(),
2918 sentinel_password: None,
2919 name: "mymaster".to_string(),
2920 cluster: RedisClusterConnection::default(),
2921 cluster_nodes: Vec::new(),
2922 };
2923 assert_eq!(conn.to_url(), "redis://:secret@127.0.0.1:6379/2");
2924 }
2925
2926 #[test]
2927 fn test_standard_url_with_username_and_password() {
2928 let conn = RedisConnection {
2929 host: "redis.example.com".to_string(),
2930 port: 6380,
2931 db: 1,
2932 username: Some("admin".to_string()),
2933 password: Some("pass123".to_string()),
2934 key_prefix: "sockudo:".to_string(),
2935 sentinels: Vec::new(),
2936 sentinel_password: None,
2937 name: "mymaster".to_string(),
2938 cluster: RedisClusterConnection::default(),
2939 cluster_nodes: Vec::new(),
2940 };
2941 assert_eq!(
2942 conn.to_url(),
2943 "redis://admin:pass123@redis.example.com:6380/1"
2944 );
2945 }
2946
2947 #[test]
2948 fn test_standard_url_with_special_chars_in_password() {
2949 let conn = RedisConnection {
2950 host: "127.0.0.1".to_string(),
2951 port: 6379,
2952 db: 0,
2953 username: None,
2954 password: Some("pass@word#123".to_string()),
2955 key_prefix: "sockudo:".to_string(),
2956 sentinels: Vec::new(),
2957 sentinel_password: None,
2958 name: "mymaster".to_string(),
2959 cluster: RedisClusterConnection::default(),
2960 cluster_nodes: Vec::new(),
2961 };
2962 assert_eq!(conn.to_url(), "redis://:pass%40word%23123@127.0.0.1:6379/0");
2963 }
2964
2965 #[test]
2966 fn test_is_sentinel_configured_false() {
2967 let conn = RedisConnection::default();
2968 assert!(!conn.is_sentinel_configured());
2969 }
2970
2971 #[test]
2972 fn test_is_sentinel_configured_true() {
2973 let conn = RedisConnection {
2974 sentinels: vec![RedisSentinel {
2975 host: "sentinel1".to_string(),
2976 port: 26379,
2977 }],
2978 ..Default::default()
2979 };
2980 assert!(conn.is_sentinel_configured());
2981 }
2982
2983 #[test]
2984 fn test_sentinel_url_basic() {
2985 let conn = RedisConnection {
2986 host: "127.0.0.1".to_string(),
2987 port: 6379,
2988 db: 0,
2989 username: None,
2990 password: None,
2991 key_prefix: "sockudo:".to_string(),
2992 sentinels: vec![
2993 RedisSentinel {
2994 host: "sentinel1".to_string(),
2995 port: 26379,
2996 },
2997 RedisSentinel {
2998 host: "sentinel2".to_string(),
2999 port: 26379,
3000 },
3001 ],
3002 sentinel_password: None,
3003 name: "mymaster".to_string(),
3004 cluster: RedisClusterConnection::default(),
3005 cluster_nodes: Vec::new(),
3006 };
3007 assert_eq!(
3008 conn.to_url(),
3009 "redis+sentinel://sentinel1:26379,sentinel2:26379/mymaster/0"
3010 );
3011 }
3012
3013 #[test]
3014 fn test_sentinel_url_with_sentinel_password() {
3015 let conn = RedisConnection {
3016 host: "127.0.0.1".to_string(),
3017 port: 6379,
3018 db: 0,
3019 username: None,
3020 password: None,
3021 key_prefix: "sockudo:".to_string(),
3022 sentinels: vec![RedisSentinel {
3023 host: "sentinel1".to_string(),
3024 port: 26379,
3025 }],
3026 sentinel_password: Some("sentinelpass".to_string()),
3027 name: "mymaster".to_string(),
3028 cluster: RedisClusterConnection::default(),
3029 cluster_nodes: Vec::new(),
3030 };
3031 assert_eq!(
3032 conn.to_url(),
3033 "redis+sentinel://:sentinelpass@sentinel1:26379/mymaster/0"
3034 );
3035 }
3036
3037 #[test]
3038 fn test_sentinel_url_with_master_password() {
3039 let conn = RedisConnection {
3040 host: "127.0.0.1".to_string(),
3041 port: 6379,
3042 db: 1,
3043 username: None,
3044 password: Some("masterpass".to_string()),
3045 key_prefix: "sockudo:".to_string(),
3046 sentinels: vec![RedisSentinel {
3047 host: "sentinel1".to_string(),
3048 port: 26379,
3049 }],
3050 sentinel_password: None,
3051 name: "mymaster".to_string(),
3052 cluster: RedisClusterConnection::default(),
3053 cluster_nodes: Vec::new(),
3054 };
3055 assert_eq!(
3056 conn.to_url(),
3057 "redis+sentinel://sentinel1:26379/mymaster/1?password=masterpass"
3058 );
3059 }
3060
3061 #[test]
3062 fn test_sentinel_url_with_all_auth() {
3063 let conn = RedisConnection {
3064 host: "127.0.0.1".to_string(),
3065 port: 6379,
3066 db: 2,
3067 username: Some("redisuser".to_string()),
3068 password: Some("redispass".to_string()),
3069 key_prefix: "sockudo:".to_string(),
3070 sentinels: vec![
3071 RedisSentinel {
3072 host: "sentinel1".to_string(),
3073 port: 26379,
3074 },
3075 RedisSentinel {
3076 host: "sentinel2".to_string(),
3077 port: 26380,
3078 },
3079 ],
3080 sentinel_password: Some("sentinelauth".to_string()),
3081 name: "production-master".to_string(),
3082 cluster: RedisClusterConnection::default(),
3083 cluster_nodes: Vec::new(),
3084 };
3085 assert_eq!(
3086 conn.to_url(),
3087 "redis+sentinel://:sentinelauth@sentinel1:26379,sentinel2:26380/production-master/2?password=redispass&username=redisuser"
3088 );
3089 }
3090
3091 #[test]
3092 fn test_sentinel_to_host_port() {
3093 let sentinel = RedisSentinel {
3094 host: "sentinel.example.com".to_string(),
3095 port: 26379,
3096 };
3097 assert_eq!(sentinel.to_host_port(), "sentinel.example.com:26379");
3098 }
3099
3100 #[test]
3101 fn test_cluster_node_urls_with_shared_cluster_auth_and_tls() {
3102 let conn = RedisConnection {
3103 cluster: RedisClusterConnection {
3104 nodes: vec![
3105 ClusterNode {
3106 host: "node1.secure-cluster.com".to_string(),
3107 port: 7000,
3108 },
3109 ClusterNode {
3110 host: "redis://node2.secure-cluster.com:7001".to_string(),
3111 port: 7001,
3112 },
3113 ClusterNode {
3114 host: "rediss://node3.secure-cluster.com".to_string(),
3115 port: 7002,
3116 },
3117 ],
3118 username: None,
3119 password: Some("cluster-secret".to_string()),
3120 use_tls: true,
3121 },
3122 ..Default::default()
3123 };
3124
3125 assert_eq!(
3126 conn.cluster_node_urls(),
3127 vec![
3128 "rediss://:cluster-secret@node1.secure-cluster.com:7000",
3129 "redis://:cluster-secret@node2.secure-cluster.com:7001",
3130 "rediss://:cluster-secret@node3.secure-cluster.com:7002",
3131 ]
3132 );
3133 }
3134
3135 #[test]
3136 fn test_cluster_node_urls_fallback_to_legacy_nodes() {
3137 let conn = RedisConnection {
3138 password: Some("fallback-secret".to_string()),
3139 cluster_nodes: vec![ClusterNode {
3140 host: "legacy-node.example.com".to_string(),
3141 port: 7000,
3142 }],
3143 ..Default::default()
3144 };
3145
3146 assert_eq!(
3147 conn.cluster_node_urls(),
3148 vec!["redis://:fallback-secret@legacy-node.example.com:7000"]
3149 );
3150 }
3151
3152 #[test]
3153 fn test_normalize_cluster_seed_urls() {
3154 let conn = RedisConnection {
3155 cluster: RedisClusterConnection {
3156 nodes: Vec::new(),
3157 username: Some("svc-user".to_string()),
3158 password: Some("svc-pass".to_string()),
3159 use_tls: true,
3160 },
3161 ..Default::default()
3162 };
3163
3164 let seeds = vec![
3165 "node1.example.com:7000".to_string(),
3166 "redis://node2.example.com:7001".to_string(),
3167 "rediss://node3.example.com".to_string(),
3168 ];
3169
3170 assert_eq!(
3171 conn.normalize_cluster_seed_urls(&seeds),
3172 vec![
3173 "rediss://svc-user:svc-pass@node1.example.com:7000",
3174 "redis://svc-user:svc-pass@node2.example.com:7001",
3175 "rediss://svc-user:svc-pass@node3.example.com:6379",
3176 ]
3177 );
3178 }
3179}
3180
3181#[cfg(test)]
3182mod cluster_node_tests {
3183 use super::ClusterNode;
3184
3185 #[test]
3186 fn test_to_url_basic_host() {
3187 let node = ClusterNode {
3188 host: "localhost".to_string(),
3189 port: 6379,
3190 };
3191 assert_eq!(node.to_url(), "redis://localhost:6379");
3192 }
3193
3194 #[test]
3195 fn test_to_url_ip_address() {
3196 let node = ClusterNode {
3197 host: "127.0.0.1".to_string(),
3198 port: 6379,
3199 };
3200 assert_eq!(node.to_url(), "redis://127.0.0.1:6379");
3201 }
3202
3203 #[test]
3204 fn test_to_url_with_redis_protocol() {
3205 let node = ClusterNode {
3206 host: "redis://example.com".to_string(),
3207 port: 6379,
3208 };
3209 assert_eq!(node.to_url(), "redis://example.com:6379");
3210 }
3211
3212 #[test]
3213 fn test_to_url_with_rediss_protocol() {
3214 let node = ClusterNode {
3215 host: "rediss://secure.example.com".to_string(),
3216 port: 6379,
3217 };
3218 assert_eq!(node.to_url(), "rediss://secure.example.com:6379");
3219 }
3220
3221 #[test]
3222 fn test_to_url_with_rediss_protocol_and_port_in_url() {
3223 let node = ClusterNode {
3224 host: "rediss://secure.example.com:7000".to_string(),
3225 port: 6379,
3226 };
3227 assert_eq!(node.to_url(), "rediss://secure.example.com:7000");
3228 }
3229
3230 #[test]
3231 fn test_to_url_with_redis_protocol_and_port_in_url() {
3232 let node = ClusterNode {
3233 host: "redis://example.com:7001".to_string(),
3234 port: 6379,
3235 };
3236 assert_eq!(node.to_url(), "redis://example.com:7001");
3237 }
3238
3239 #[test]
3240 fn test_to_url_with_trailing_whitespace() {
3241 let node = ClusterNode {
3242 host: " rediss://secure.example.com ".to_string(),
3243 port: 6379,
3244 };
3245 assert_eq!(node.to_url(), "rediss://secure.example.com:6379");
3246 }
3247
3248 #[test]
3249 fn test_to_url_custom_port() {
3250 let node = ClusterNode {
3251 host: "redis-cluster.example.com".to_string(),
3252 port: 7000,
3253 };
3254 assert_eq!(node.to_url(), "redis://redis-cluster.example.com:7000");
3255 }
3256
3257 #[test]
3258 fn test_to_url_plain_host_with_port_in_host_field() {
3259 let node = ClusterNode {
3260 host: "redis-cluster.example.com:7010".to_string(),
3261 port: 7000,
3262 };
3263 assert_eq!(node.to_url(), "redis://redis-cluster.example.com:7010");
3264 }
3265
3266 #[test]
3267 fn test_to_url_with_options_adds_auth_and_tls() {
3268 let node = ClusterNode {
3269 host: "node.example.com".to_string(),
3270 port: 7000,
3271 };
3272 assert_eq!(
3273 node.to_url_with_options(true, Some("svc-user"), Some("secret")),
3274 "rediss://svc-user:secret@node.example.com:7000"
3275 );
3276 }
3277
3278 #[test]
3279 fn test_to_url_with_options_keeps_embedded_auth() {
3280 let node = ClusterNode {
3281 host: "rediss://:node-secret@node.example.com:7000".to_string(),
3282 port: 7000,
3283 };
3284 assert_eq!(
3285 node.to_url_with_options(true, Some("global-user"), Some("global-secret")),
3286 "rediss://:node-secret@node.example.com:7000"
3287 );
3288 }
3289
3290 #[test]
3291 fn test_from_seed_parses_plain_host_port() {
3292 let node = ClusterNode::from_seed("cluster-node-1:7005").expect("node should parse");
3293 assert_eq!(node.host, "cluster-node-1");
3294 assert_eq!(node.port, 7005);
3295 }
3296
3297 #[test]
3298 fn test_from_seed_keeps_scheme_urls() {
3299 let node =
3300 ClusterNode::from_seed("rediss://secure.example.com:7005").expect("node should parse");
3301 assert_eq!(node.host, "rediss://secure.example.com:7005");
3302 assert_eq!(node.port, 7005);
3303 }
3304
3305 #[test]
3306 fn test_to_url_aws_elasticache_hostname() {
3307 let node = ClusterNode {
3308 host: "rediss://my-cluster.use1.cache.amazonaws.com".to_string(),
3309 port: 6379,
3310 };
3311 assert_eq!(
3312 node.to_url(),
3313 "rediss://my-cluster.use1.cache.amazonaws.com:6379"
3314 );
3315 }
3316
3317 #[test]
3318 fn test_to_url_with_ipv6_no_port() {
3319 let node = ClusterNode {
3320 host: "rediss://[::1]".to_string(),
3321 port: 6379,
3322 };
3323 assert_eq!(node.to_url(), "rediss://[::1]:6379");
3324 }
3325
3326 #[test]
3327 fn test_to_url_with_ipv6_and_port_in_url() {
3328 let node = ClusterNode {
3329 host: "rediss://[::1]:7000".to_string(),
3330 port: 6379,
3331 };
3332 assert_eq!(node.to_url(), "rediss://[::1]:7000");
3333 }
3334
3335 #[test]
3336 fn test_to_url_with_ipv6_full_address_no_port() {
3337 let node = ClusterNode {
3338 host: "rediss://[2001:db8::1]".to_string(),
3339 port: 6379,
3340 };
3341 assert_eq!(node.to_url(), "rediss://[2001:db8::1]:6379");
3342 }
3343
3344 #[test]
3345 fn test_to_url_with_ipv6_full_address_with_port() {
3346 let node = ClusterNode {
3347 host: "rediss://[2001:db8::1]:7000".to_string(),
3348 port: 6379,
3349 };
3350 assert_eq!(node.to_url(), "rediss://[2001:db8::1]:7000");
3351 }
3352
3353 #[test]
3354 fn test_to_url_with_redis_protocol_ipv6() {
3355 let node = ClusterNode {
3356 host: "redis://[::1]".to_string(),
3357 port: 6379,
3358 };
3359 assert_eq!(node.to_url(), "redis://[::1]:6379");
3360 }
3361}
3362
3363#[cfg(test)]
3364mod cors_config_tests {
3365 use super::CorsConfig;
3366
3367 fn cors_from_json(json: &str) -> sonic_rs::Result<CorsConfig> {
3368 sonic_rs::from_str(json)
3369 }
3370
3371 #[test]
3372 fn test_deserialize_valid_exact_origins() {
3373 let config =
3374 cors_from_json(r#"{"origin": ["https://example.com", "https://other.com"]}"#).unwrap();
3375 assert_eq!(config.origin.len(), 2);
3376 }
3377
3378 #[test]
3379 fn test_deserialize_valid_wildcard_patterns() {
3380 let config =
3381 cors_from_json(r#"{"origin": ["*.example.com", "https://*.staging.example.com"]}"#)
3382 .unwrap();
3383 assert_eq!(config.origin.len(), 2);
3384 }
3385
3386 #[test]
3387 fn test_deserialize_allows_special_markers() {
3388 assert!(cors_from_json(r#"{"origin": ["*"]}"#).is_ok());
3389 assert!(cors_from_json(r#"{"origin": ["Any"]}"#).is_ok());
3390 assert!(cors_from_json(r#"{"origin": ["any"]}"#).is_ok());
3391 assert!(cors_from_json(r#"{"origin": ["*", "https://example.com"]}"#).is_ok());
3392 }
3393
3394 #[test]
3395 fn test_deserialize_rejects_invalid_patterns() {
3396 assert!(cors_from_json(r#"{"origin": ["*.*bad"]}"#).is_err());
3397 assert!(cors_from_json(r#"{"origin": ["*."]}"#).is_err());
3398 assert!(cors_from_json(r#"{"origin": [""]}"#).is_err());
3399 assert!(cors_from_json(r#"{"origin": ["https://"]}"#).is_err());
3400 }
3401
3402 #[test]
3403 fn test_deserialize_rejects_mixed_valid_and_invalid() {
3404 assert!(cors_from_json(r#"{"origin": ["https://good.com", "*.*bad"]}"#).is_err());
3405 }
3406}