1use crate::app::App;
2use crate::utils::{parse_bool_env, parse_env, parse_env_optional};
3use ahash::AHashMap;
4use serde::{Deserialize, Serialize};
5use std::str::FromStr;
6use tracing::{info, warn};
7use url::Url;
8
9fn deserialize_octal_permission<'de, D>(deserializer: D) -> Result<u32, D::Error>
11where
12 D: serde::Deserializer<'de>,
13{
14 use serde::de::{self};
15
16 let s = String::deserialize(deserializer)?;
17
18 if !s.chars().all(|c| c.is_digit(8)) {
20 return Err(de::Error::custom(format!(
21 "invalid octal permission mode '{}': must contain only digits 0-7",
22 s
23 )));
24 }
25
26 let mode = u32::from_str_radix(&s, 8)
28 .map_err(|_| de::Error::custom(format!("invalid octal permission mode: {}", s)))?;
29
30 if mode > 0o777 {
32 return Err(de::Error::custom(format!(
33 "permission mode '{}' exceeds maximum value 777",
34 s
35 )));
36 }
37
38 Ok(mode)
39}
40
41fn parse_driver_enum<T: FromStr + Clone + std::fmt::Debug>(
43 driver_str: String,
44 default_driver: T,
45 driver_name: &str,
46) -> T
47where
48 <T as FromStr>::Err: std::fmt::Debug,
49{
50 match T::from_str(&driver_str.to_lowercase()) {
51 Ok(driver_enum) => driver_enum,
52 Err(e) => {
53 warn!(
54 "Failed to parse {} driver from string '{}': {:?}. Using default: {:?}.",
55 driver_name, driver_str, e, default_driver
56 );
57 default_driver
58 }
59 }
60}
61
62fn override_db_pool_settings(db_conn: &mut DatabaseConnection, prefix: &str) {
63 if let Some(min) = parse_env_optional::<u32>(&format!("{}_POOL_MIN", prefix)) {
64 db_conn.pool_min = Some(min);
65 }
66 if let Some(max) = parse_env_optional::<u32>(&format!("{}_POOL_MAX", prefix)) {
67 db_conn.pool_max = Some(max);
68 }
69}
70
71#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
74#[serde(rename_all = "lowercase")]
75pub enum AdapterDriver {
76 #[default]
77 Local,
78 Redis,
79 #[serde(rename = "redis-cluster")]
80 RedisCluster,
81 Nats,
82 Pulsar,
83 RabbitMq,
84 #[serde(rename = "google-pubsub")]
85 GooglePubSub,
86 Kafka,
87}
88
89#[derive(Debug, Clone, Serialize, Deserialize)]
90#[serde(default)]
91pub struct DynamoDbSettings {
92 pub region: String,
93 pub table_name: String,
94 pub endpoint_url: Option<String>,
95 pub aws_access_key_id: Option<String>,
96 pub aws_secret_access_key: Option<String>,
97 pub aws_profile_name: Option<String>,
98}
99
100impl Default for DynamoDbSettings {
101 fn default() -> Self {
102 Self {
103 region: "us-east-1".to_string(),
104 table_name: "sockudo-applications".to_string(),
105 endpoint_url: None,
106 aws_access_key_id: None,
107 aws_secret_access_key: None,
108 aws_profile_name: None,
109 }
110 }
111}
112
113#[derive(Debug, Clone, Serialize, Deserialize)]
114#[serde(default)]
115pub struct ScyllaDbSettings {
116 pub nodes: Vec<String>,
117 pub keyspace: String,
118 pub table_name: String,
119 pub username: Option<String>,
120 pub password: Option<String>,
121 pub replication_class: String,
122 pub replication_factor: u32,
123}
124
125impl Default for ScyllaDbSettings {
126 fn default() -> Self {
127 Self {
128 nodes: vec!["127.0.0.1:9042".to_string()],
129 keyspace: "sockudo".to_string(),
130 table_name: "applications".to_string(),
131 username: None,
132 password: None,
133 replication_class: "SimpleStrategy".to_string(),
134 replication_factor: 3,
135 }
136 }
137}
138
139#[derive(Debug, Clone, Serialize, Deserialize)]
140#[serde(default)]
141pub struct SurrealDbSettings {
142 pub url: String,
143 pub namespace: String,
144 pub database: String,
145 pub username: String,
146 pub password: String,
147 pub table_name: String,
148 pub cache_ttl: u64,
149 pub cache_max_capacity: u64,
150}
151
152impl Default for SurrealDbSettings {
153 fn default() -> Self {
154 Self {
155 url: "ws://127.0.0.1:8000".to_string(),
156 namespace: "sockudo".to_string(),
157 database: "sockudo".to_string(),
158 username: "root".to_string(),
159 password: "root".to_string(),
160 table_name: "applications".to_string(),
161 cache_ttl: 300,
162 cache_max_capacity: 100,
163 }
164 }
165}
166
167impl FromStr for AdapterDriver {
168 type Err = String;
169 fn from_str(s: &str) -> Result<Self, Self::Err> {
170 match s.to_lowercase().as_str() {
171 "local" => Ok(AdapterDriver::Local),
172 "redis" => Ok(AdapterDriver::Redis),
173 "redis-cluster" => Ok(AdapterDriver::RedisCluster),
174 "nats" => Ok(AdapterDriver::Nats),
175 "pulsar" => Ok(AdapterDriver::Pulsar),
176 "rabbitmq" | "rabbit-mq" => Ok(AdapterDriver::RabbitMq),
177 "google-pubsub" | "gcp-pubsub" | "pubsub" => Ok(AdapterDriver::GooglePubSub),
178 "kafka" => Ok(AdapterDriver::Kafka),
179 _ => Err(format!("Unknown adapter driver: {s}")),
180 }
181 }
182}
183
184#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
185#[serde(rename_all = "lowercase")]
186pub enum AppManagerDriver {
187 #[default]
188 Memory,
189 Mysql,
190 Dynamodb,
191 PgSql,
192 SurrealDb,
193 ScyllaDb,
194}
195impl FromStr for AppManagerDriver {
196 type Err = String;
197 fn from_str(s: &str) -> Result<Self, Self::Err> {
198 match s.to_lowercase().as_str() {
199 "memory" => Ok(AppManagerDriver::Memory),
200 "mysql" => Ok(AppManagerDriver::Mysql),
201 "dynamodb" => Ok(AppManagerDriver::Dynamodb),
202 "pgsql" | "postgres" | "postgresql" => Ok(AppManagerDriver::PgSql),
203 "surreal" | "surrealdb" => Ok(AppManagerDriver::SurrealDb),
204 "scylladb" | "scylla" => Ok(AppManagerDriver::ScyllaDb),
205 _ => Err(format!("Unknown app manager driver: {s}")),
206 }
207 }
208}
209
210#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
211#[serde(rename_all = "lowercase")]
212pub enum CacheDriver {
213 #[default]
214 Memory,
215 Redis,
216 #[serde(rename = "redis-cluster")]
217 RedisCluster,
218 None,
219}
220
221impl FromStr for CacheDriver {
222 type Err = String;
223 fn from_str(s: &str) -> Result<Self, Self::Err> {
224 match s.to_lowercase().as_str() {
225 "memory" => Ok(CacheDriver::Memory),
226 "redis" => Ok(CacheDriver::Redis),
227 "redis-cluster" => Ok(CacheDriver::RedisCluster),
228 "none" => Ok(CacheDriver::None),
229 _ => Err(format!("Unknown cache driver: {s}")),
230 }
231 }
232}
233
234#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
235#[serde(rename_all = "lowercase")]
236pub enum QueueDriver {
237 Memory,
238 #[default]
239 Redis,
240 #[serde(rename = "redis-cluster")]
241 RedisCluster,
242 Sqs,
243 Sns,
244 None,
245}
246
247impl FromStr for QueueDriver {
248 type Err = String;
249 fn from_str(s: &str) -> Result<Self, Self::Err> {
250 match s.to_lowercase().as_str() {
251 "memory" => Ok(QueueDriver::Memory),
252 "redis" => Ok(QueueDriver::Redis),
253 "redis-cluster" => Ok(QueueDriver::RedisCluster),
254 "sqs" => Ok(QueueDriver::Sqs),
255 "sns" => Ok(QueueDriver::Sns),
256 "none" => Ok(QueueDriver::None),
257 _ => Err(format!("Unknown queue driver: {s}")),
258 }
259 }
260}
261
262impl AsRef<str> for QueueDriver {
263 fn as_ref(&self) -> &str {
264 match self {
265 QueueDriver::Memory => "memory",
266 QueueDriver::Redis => "redis",
267 QueueDriver::RedisCluster => "redis-cluster",
268 QueueDriver::Sqs => "sqs",
269 QueueDriver::Sns => "sns",
270 QueueDriver::None => "none",
271 }
272 }
273}
274
275#[derive(Debug, Clone, Serialize, Deserialize)]
276#[serde(default)]
277pub struct RedisClusterQueueConfig {
278 pub concurrency: u32,
279 pub prefix: Option<String>,
280 pub nodes: Vec<String>,
281 pub request_timeout_ms: u64,
282}
283
284impl Default for RedisClusterQueueConfig {
285 fn default() -> Self {
286 Self {
287 concurrency: 5,
288 prefix: Some("sockudo_queue:".to_string()),
289 nodes: vec!["redis://127.0.0.1:6379".to_string()],
290 request_timeout_ms: 5000,
291 }
292 }
293}
294
295#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
296#[serde(rename_all = "lowercase")]
297pub enum MetricsDriver {
298 #[default]
299 Prometheus,
300}
301
302#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
303#[serde(rename_all = "lowercase")]
304pub enum LogOutputFormat {
305 #[default]
306 Human,
307 Json,
308}
309
310impl FromStr for LogOutputFormat {
311 type Err = String;
312 fn from_str(s: &str) -> Result<Self, Self::Err> {
313 match s.to_lowercase().as_str() {
314 "human" => Ok(LogOutputFormat::Human),
315 "json" => Ok(LogOutputFormat::Json),
316 _ => Err(format!("Unknown log output format: {s}")),
317 }
318 }
319}
320
321impl FromStr for MetricsDriver {
322 type Err = String;
323 fn from_str(s: &str) -> Result<Self, Self::Err> {
324 match s.to_lowercase().as_str() {
325 "prometheus" => Ok(MetricsDriver::Prometheus),
326 _ => Err(format!("Unknown metrics driver: {s}")),
327 }
328 }
329}
330
331impl AsRef<str> for MetricsDriver {
332 fn as_ref(&self) -> &str {
333 match self {
334 MetricsDriver::Prometheus => "prometheus",
335 }
336 }
337}
338
339#[derive(Debug, Clone, Serialize, Deserialize)]
341#[serde(default)]
342pub struct ServerOptions {
343 pub adapter: AdapterConfig,
344 pub app_manager: AppManagerConfig,
345 pub cache: CacheConfig,
346 pub channel_limits: ChannelLimits,
347 pub cors: CorsConfig,
348 pub database: DatabaseConfig,
349 pub database_pooling: DatabasePooling,
350 pub debug: bool,
351 pub event_limits: EventLimits,
352 pub host: String,
353 pub http_api: HttpApiConfig,
354 pub instance: InstanceConfig,
355 pub logging: Option<LoggingConfig>,
356 pub metrics: MetricsConfig,
357 pub mode: String,
358 pub port: u16,
359 pub path_prefix: String,
360 pub presence: PresenceConfig,
361 pub queue: QueueConfig,
362 pub rate_limiter: RateLimiterConfig,
363 pub shutdown_grace_period: u64,
364 pub ssl: SslConfig,
365 pub user_authentication_timeout: u64,
366 pub webhooks: WebhooksConfig,
367 pub websocket_max_payload_kb: u32,
368 pub cleanup: CleanupConfig,
369 pub activity_timeout: u64,
370 pub cluster_health: ClusterHealthConfig,
371 pub unix_socket: UnixSocketConfig,
372 pub delta_compression: DeltaCompressionOptionsConfig,
373 pub tag_filtering: TagFilteringConfig,
374 pub websocket: WebSocketConfig,
375 pub connection_recovery: ConnectionRecoveryConfig,
376 pub idempotency: IdempotencyConfig,
377 pub ephemeral: EphemeralConfig,
378 pub echo_control: EchoControlConfig,
379 pub event_name_filtering: EventNameFilteringConfig,
380}
381
382#[derive(Debug, Clone, Serialize, Deserialize)]
385#[serde(default)]
386pub struct SqsQueueConfig {
387 pub region: String,
388 pub queue_url_prefix: Option<String>,
389 pub visibility_timeout: i32,
390 pub endpoint_url: Option<String>,
391 pub max_messages: i32,
392 pub wait_time_seconds: i32,
393 pub concurrency: u32,
394 pub fifo: bool,
395 pub message_group_id: Option<String>,
396}
397
398#[derive(Debug, Clone, Serialize, Deserialize)]
399#[serde(default)]
400pub struct SnsQueueConfig {
401 pub region: String,
402 pub topic_arn: String,
403 pub endpoint_url: Option<String>,
404}
405
406#[derive(Debug, Clone, Serialize, Deserialize)]
407#[serde(default)]
408pub struct AdapterConfig {
409 pub driver: AdapterDriver,
410 pub redis: RedisAdapterConfig,
411 pub cluster: RedisClusterAdapterConfig,
412 pub nats: NatsAdapterConfig,
413 pub pulsar: PulsarAdapterConfig,
414 pub rabbitmq: RabbitMqAdapterConfig,
415 pub google_pubsub: GooglePubSubAdapterConfig,
416 pub kafka: KafkaAdapterConfig,
417 #[serde(default = "default_buffer_multiplier_per_cpu")]
418 pub buffer_multiplier_per_cpu: usize,
419 pub cluster_health: ClusterHealthConfig,
420 #[serde(default = "default_enable_socket_counting")]
421 pub enable_socket_counting: bool,
422}
423
424fn default_enable_socket_counting() -> bool {
425 true
426}
427
428fn default_buffer_multiplier_per_cpu() -> usize {
429 64
430}
431
432impl Default for AdapterConfig {
433 fn default() -> Self {
434 Self {
435 driver: AdapterDriver::default(),
436 redis: RedisAdapterConfig::default(),
437 cluster: RedisClusterAdapterConfig::default(),
438 nats: NatsAdapterConfig::default(),
439 pulsar: PulsarAdapterConfig::default(),
440 rabbitmq: RabbitMqAdapterConfig::default(),
441 google_pubsub: GooglePubSubAdapterConfig::default(),
442 kafka: KafkaAdapterConfig::default(),
443 buffer_multiplier_per_cpu: default_buffer_multiplier_per_cpu(),
444 cluster_health: ClusterHealthConfig::default(),
445 enable_socket_counting: default_enable_socket_counting(),
446 }
447 }
448}
449
450#[derive(Debug, Clone, Serialize, Deserialize)]
451#[serde(default)]
452pub struct RedisAdapterConfig {
453 pub requests_timeout: u64,
454 pub prefix: String,
455 pub redis_pub_options: AHashMap<String, sonic_rs::Value>,
456 pub redis_sub_options: AHashMap<String, sonic_rs::Value>,
457 pub cluster_mode: bool,
458}
459
460#[derive(Debug, Clone, Serialize, Deserialize)]
461#[serde(default)]
462pub struct RedisClusterAdapterConfig {
463 pub nodes: Vec<String>,
464 pub prefix: String,
465 pub request_timeout_ms: u64,
466 pub use_connection_manager: bool,
467 #[serde(default)]
468 pub use_sharded_pubsub: bool,
469}
470
471#[derive(Debug, Clone, Serialize, Deserialize)]
472#[serde(default)]
473pub struct NatsAdapterConfig {
474 pub servers: Vec<String>,
475 pub prefix: String,
476 pub request_timeout_ms: u64,
477 pub username: Option<String>,
478 pub password: Option<String>,
479 pub token: Option<String>,
480 pub connection_timeout_ms: u64,
481 pub nodes_number: Option<u32>,
482 pub discovery_max_wait_ms: u64,
483 pub discovery_idle_wait_ms: u64,
484}
485
486#[derive(Debug, Clone, Serialize, Deserialize)]
487#[serde(default)]
488pub struct PulsarAdapterConfig {
489 pub url: String,
490 pub prefix: String,
491 pub request_timeout_ms: u64,
492 pub token: Option<String>,
493 pub nodes_number: Option<u32>,
494}
495
496#[derive(Debug, Clone, Serialize, Deserialize)]
497#[serde(default)]
498pub struct RabbitMqAdapterConfig {
499 pub url: String,
500 pub prefix: String,
501 pub request_timeout_ms: u64,
502 pub connection_timeout_ms: u64,
503 pub nodes_number: Option<u32>,
504}
505
506#[derive(Debug, Clone, Serialize, Deserialize)]
507#[serde(default)]
508pub struct GooglePubSubAdapterConfig {
509 pub project_id: String,
510 pub prefix: String,
511 pub request_timeout_ms: u64,
512 pub emulator_host: Option<String>,
513 pub nodes_number: Option<u32>,
514}
515
516#[derive(Debug, Clone, Serialize, Deserialize)]
517#[serde(default)]
518pub struct KafkaAdapterConfig {
519 pub brokers: Vec<String>,
520 pub prefix: String,
521 pub request_timeout_ms: u64,
522 pub security_protocol: Option<String>,
523 pub sasl_mechanism: Option<String>,
524 pub sasl_username: Option<String>,
525 pub sasl_password: Option<String>,
526 pub nodes_number: Option<u32>,
527}
528
529#[derive(Debug, Clone, Serialize, Deserialize, Default)]
530#[serde(default)]
531pub struct AppManagerConfig {
532 pub driver: AppManagerDriver,
533 pub array: ArrayConfig,
534 pub cache: CacheSettings,
535 pub scylladb: ScyllaDbSettings,
536}
537
538#[derive(Debug, Clone, Serialize, Deserialize, Default)]
539#[serde(default)]
540pub struct ArrayConfig {
541 pub apps: Vec<App>,
542}
543
544#[derive(Debug, Clone, Serialize, Deserialize)]
545#[serde(default)]
546pub struct CacheSettings {
547 pub enabled: bool,
548 pub ttl: u64,
549}
550
551#[derive(Debug, Clone, Serialize, Deserialize)]
552#[serde(default)]
553pub struct MemoryCacheOptions {
554 pub ttl: u64,
555 pub cleanup_interval: u64,
556 pub max_capacity: u64,
557}
558
559#[derive(Debug, Clone, Serialize, Deserialize)]
560#[serde(default)]
561pub struct CacheConfig {
562 pub driver: CacheDriver,
563 pub redis: RedisConfig,
564 pub memory: MemoryCacheOptions,
565}
566
567#[derive(Debug, Clone, Serialize, Deserialize, Default)]
568#[serde(default)]
569pub struct RedisConfig {
570 pub prefix: Option<String>,
571 pub url_override: Option<String>,
572 pub cluster_mode: bool,
573}
574
575#[derive(Debug, Clone, Serialize, Deserialize)]
576#[serde(default)]
577pub struct ChannelLimits {
578 pub max_name_length: u32,
579 pub cache_ttl: u64,
580}
581
582#[derive(Debug, Clone, Serialize, Deserialize)]
583#[serde(default)]
584pub struct CorsConfig {
585 pub credentials: bool,
586 #[serde(deserialize_with = "deserialize_and_validate_cors_origins")]
587 pub origin: Vec<String>,
588 pub methods: Vec<String>,
589 pub allowed_headers: Vec<String>,
590}
591
592fn deserialize_and_validate_cors_origins<'de, D>(deserializer: D) -> Result<Vec<String>, D::Error>
593where
594 D: serde::Deserializer<'de>,
595{
596 use serde::de::Error;
597 let origins = Vec::<String>::deserialize(deserializer)?;
598
599 if let Err(e) = crate::origin_validation::OriginValidator::validate_patterns(&origins) {
600 return Err(D::Error::custom(format!(
601 "CORS origin pattern validation failed: {}",
602 e
603 )));
604 }
605
606 Ok(origins)
607}
608
609#[derive(Debug, Clone, Serialize, Deserialize, Default)]
610#[serde(default)]
611pub struct DatabaseConfig {
612 pub mysql: DatabaseConnection,
613 pub postgres: DatabaseConnection,
614 pub redis: RedisConnection,
615 pub dynamodb: DynamoDbSettings,
616 pub surrealdb: SurrealDbSettings,
617 pub scylladb: ScyllaDbSettings,
618}
619
620#[derive(Debug, Clone, Serialize, Deserialize)]
621#[serde(default)]
622pub struct DatabaseConnection {
623 pub host: String,
624 pub port: u16,
625 pub username: String,
626 pub password: String,
627 pub database: String,
628 pub table_name: String,
629 pub connection_pool_size: u32,
630 pub pool_min: Option<u32>,
631 pub pool_max: Option<u32>,
632 pub cache_ttl: u64,
633 pub cache_cleanup_interval: u64,
634 pub cache_max_capacity: u64,
635}
636
637#[derive(Debug, Clone, Serialize, Deserialize)]
638#[serde(default)]
639pub struct RedisConnection {
640 pub host: String,
641 pub port: u16,
642 pub db: u32,
643 pub username: Option<String>,
644 pub password: Option<String>,
645 pub key_prefix: String,
646 pub sentinels: Vec<RedisSentinel>,
647 pub sentinel_password: Option<String>,
648 pub name: String,
649 pub cluster: RedisClusterConnection,
650 pub cluster_nodes: Vec<ClusterNode>,
652}
653
654#[derive(Debug, Clone, Serialize, Deserialize)]
655#[serde(default)]
656pub struct RedisSentinel {
657 pub host: String,
658 pub port: u16,
659}
660
661#[derive(Debug, Clone, Serialize, Deserialize, Default)]
662#[serde(default)]
663pub struct RedisClusterConnection {
664 pub nodes: Vec<ClusterNode>,
665 pub username: Option<String>,
666 pub password: Option<String>,
667 #[serde(alias = "useTLS")]
668 pub use_tls: bool,
669}
670
671impl RedisConnection {
672 pub fn is_sentinel_configured(&self) -> bool {
674 !self.sentinels.is_empty()
675 }
676
677 pub fn to_url(&self) -> String {
679 if self.is_sentinel_configured() {
680 self.build_sentinel_url()
681 } else {
682 self.build_standard_url()
683 }
684 }
685
686 fn build_standard_url(&self) -> String {
687 let (scheme, host) = if self.host.starts_with("rediss://") {
689 ("rediss://", self.host.trim_start_matches("rediss://"))
690 } else if self.host.starts_with("redis://") {
691 ("redis://", self.host.trim_start_matches("redis://"))
692 } else {
693 ("redis://", self.host.as_str())
694 };
695
696 let mut url = String::from(scheme);
697
698 if let Some(ref username) = self.username {
699 url.push_str(username);
700 if let Some(ref password) = self.password {
701 url.push(':');
702 url.push_str(&urlencoding::encode(password));
703 }
704 url.push('@');
705 } else if let Some(ref password) = self.password {
706 url.push(':');
707 url.push_str(&urlencoding::encode(password));
708 url.push('@');
709 }
710
711 url.push_str(host);
712 url.push(':');
713 url.push_str(&self.port.to_string());
714 url.push('/');
715 url.push_str(&self.db.to_string());
716
717 url
718 }
719
720 fn build_sentinel_url(&self) -> String {
721 let mut url = String::from("redis+sentinel://");
722
723 if let Some(ref sentinel_password) = self.sentinel_password {
724 url.push(':');
725 url.push_str(&urlencoding::encode(sentinel_password));
726 url.push('@');
727 }
728
729 let sentinel_hosts: Vec<String> = self
730 .sentinels
731 .iter()
732 .map(|s| format!("{}:{}", s.host, s.port))
733 .collect();
734 url.push_str(&sentinel_hosts.join(","));
735
736 url.push('/');
737 url.push_str(&self.name);
738 url.push('/');
739 url.push_str(&self.db.to_string());
740
741 let mut params = Vec::new();
742 if let Some(ref password) = self.password {
743 params.push(format!("password={}", urlencoding::encode(password)));
744 }
745 if let Some(ref username) = self.username {
746 params.push(format!("username={}", urlencoding::encode(username)));
747 }
748
749 if !params.is_empty() {
750 url.push('?');
751 url.push_str(¶ms.join("&"));
752 }
753
754 url
755 }
756
757 pub fn has_cluster_nodes(&self) -> bool {
760 !self.cluster.nodes.is_empty() || !self.cluster_nodes.is_empty()
761 }
762
763 pub fn cluster_node_urls(&self) -> Vec<String> {
766 if !self.cluster.nodes.is_empty() {
767 return self.build_cluster_urls(&self.cluster.nodes);
768 }
769 self.build_cluster_urls(&self.cluster_nodes)
770 }
771
772 pub fn normalize_cluster_seed_urls(&self, seeds: &[String]) -> Vec<String> {
775 self.build_cluster_urls(
776 &seeds
777 .iter()
778 .filter_map(|seed| ClusterNode::from_seed(seed))
779 .collect::<Vec<ClusterNode>>(),
780 )
781 }
782
783 fn build_cluster_urls(&self, nodes: &[ClusterNode]) -> Vec<String> {
784 let username = self
785 .cluster
786 .username
787 .as_deref()
788 .or(self.username.as_deref());
789 let password = self
790 .cluster
791 .password
792 .as_deref()
793 .or(self.password.as_deref());
794 let use_tls = self.cluster.use_tls;
795
796 nodes
797 .iter()
798 .map(|node| node.to_url_with_options(use_tls, username, password))
799 .collect()
800 }
801}
802
803impl RedisSentinel {
804 pub fn to_host_port(&self) -> String {
805 format!("{}:{}", self.host, self.port)
806 }
807}
808
809#[derive(Debug, Clone, Serialize, Deserialize)]
810#[serde(default)]
811pub struct ClusterNode {
812 pub host: String,
813 pub port: u16,
814}
815
816impl ClusterNode {
817 pub fn to_url(&self) -> String {
818 self.to_url_with_options(false, None, None)
819 }
820
821 pub fn to_url_with_options(
822 &self,
823 use_tls: bool,
824 username: Option<&str>,
825 password: Option<&str>,
826 ) -> String {
827 let host = self.host.trim();
828
829 if host.starts_with("redis://") || host.starts_with("rediss://") {
830 if let Ok(parsed) = Url::parse(host)
831 && let Some(host_str) = parsed.host_str()
832 {
833 let scheme = parsed.scheme();
834 let port = parsed.port_or_known_default().unwrap_or(self.port);
835 let parsed_username = (!parsed.username().is_empty()).then_some(parsed.username());
836 let parsed_password = parsed.password();
837 let has_embedded_auth = parsed_username.is_some() || parsed_password.is_some();
838 let (effective_username, effective_password) = if has_embedded_auth {
839 (parsed_username, parsed_password)
840 } else {
841 (username, password)
842 };
843
844 return build_redis_url(
845 scheme,
846 host_str,
847 port,
848 effective_username,
849 effective_password,
850 );
851 }
852
853 let has_port = if let Some(bracket_pos) = host.rfind(']') {
855 host[bracket_pos..].contains(':')
856 } else {
857 host.split(':').count() >= 3
858 };
859 let base = if has_port {
860 host.to_string()
861 } else {
862 format!("{}:{}", host, self.port)
863 };
864
865 if let Ok(parsed) = Url::parse(&base) {
866 let parsed_username = (!parsed.username().is_empty()).then_some(parsed.username());
867 let parsed_password = parsed.password();
868 if let Some(host_str) = parsed.host_str() {
869 let port = parsed.port_or_known_default().unwrap_or(self.port);
870 let has_embedded_auth = parsed_username.is_some() || parsed_password.is_some();
871 let (effective_username, effective_password) = if has_embedded_auth {
872 (parsed_username, parsed_password)
873 } else {
874 (username, password)
875 };
876 return build_redis_url(
877 parsed.scheme(),
878 host_str,
879 port,
880 effective_username,
881 effective_password,
882 );
883 }
884 }
885 return base;
886 }
887
888 let (normalized_host, normalized_port) = split_plain_host_and_port(host, self.port);
889 let scheme = if use_tls { "rediss" } else { "redis" };
890 build_redis_url(
891 scheme,
892 &normalized_host,
893 normalized_port,
894 username,
895 password,
896 )
897 }
898
899 pub fn from_seed(seed: &str) -> Option<Self> {
900 let trimmed = seed.trim();
901 if trimmed.is_empty() {
902 return None;
903 }
904
905 if trimmed.starts_with("redis://") || trimmed.starts_with("rediss://") {
906 let port = Url::parse(trimmed)
907 .ok()
908 .and_then(|parsed| parsed.port_or_known_default())
909 .unwrap_or(6379);
910 return Some(Self {
911 host: trimmed.to_string(),
912 port,
913 });
914 }
915
916 let (host, port) = split_plain_host_and_port(trimmed, 6379);
917 Some(Self { host, port })
918 }
919}
920
921fn split_plain_host_and_port(raw_host: &str, default_port: u16) -> (String, u16) {
922 let host = raw_host.trim();
923
924 if host.starts_with('[') {
926 if let Some(end_bracket) = host.find(']') {
927 let host_part = host[1..end_bracket].to_string();
928 let remainder = &host[end_bracket + 1..];
929 if let Some(port_str) = remainder.strip_prefix(':')
930 && let Ok(port) = port_str.parse::<u16>()
931 {
932 return (host_part, port);
933 }
934 return (host_part, default_port);
935 }
936 return (host.to_string(), default_port);
937 }
938
939 if host.matches(':').count() == 1
941 && let Some((host_part, port_part)) = host.rsplit_once(':')
942 && let Ok(port) = port_part.parse::<u16>()
943 {
944 return (host_part.to_string(), port);
945 }
946
947 (host.to_string(), default_port)
948}
949
950fn build_redis_url(
951 scheme: &str,
952 host: &str,
953 port: u16,
954 username: Option<&str>,
955 password: Option<&str>,
956) -> String {
957 let mut url = format!("{scheme}://");
958
959 if let Some(user) = username {
960 url.push_str(&urlencoding::encode(user));
961 if let Some(pass) = password {
962 url.push(':');
963 url.push_str(&urlencoding::encode(pass));
964 }
965 url.push('@');
966 } else if let Some(pass) = password {
967 url.push(':');
968 url.push_str(&urlencoding::encode(pass));
969 url.push('@');
970 }
971
972 if host.contains(':') && !host.starts_with('[') {
973 url.push('[');
974 url.push_str(host);
975 url.push(']');
976 } else {
977 url.push_str(host);
978 }
979 url.push(':');
980 url.push_str(&port.to_string());
981 url
982}
983
984#[derive(Debug, Clone, Serialize, Deserialize)]
985#[serde(default)]
986pub struct DatabasePooling {
987 pub enabled: bool,
988 pub min: u32,
989 pub max: u32,
990}
991
992#[derive(Debug, Clone, Serialize, Deserialize)]
993#[serde(default)]
994pub struct EventLimits {
995 pub max_channels_at_once: u32,
996 pub max_name_length: u32,
997 pub max_payload_in_kb: u32,
998 pub max_batch_size: u32,
999}
1000
1001#[derive(Debug, Clone, Serialize, Deserialize)]
1002#[serde(default)]
1003pub struct IdempotencyConfig {
1004 pub enabled: bool,
1006 pub ttl_seconds: u64,
1008 pub max_key_length: usize,
1010}
1011
1012#[derive(Debug, Clone, Serialize, Deserialize)]
1013#[serde(default)]
1014pub struct EphemeralConfig {
1015 pub enabled: bool,
1017}
1018
1019#[derive(Debug, Clone, Serialize, Deserialize)]
1020#[serde(default)]
1021pub struct EchoControlConfig {
1022 pub enabled: bool,
1024 pub default_echo_messages: bool,
1026}
1027
1028#[derive(Debug, Clone, Serialize, Deserialize)]
1029#[serde(default)]
1030pub struct EventNameFilteringConfig {
1031 pub enabled: bool,
1033 pub max_events_per_filter: usize,
1035 pub max_event_name_length: usize,
1037}
1038
1039#[derive(Debug, Clone, Serialize, Deserialize)]
1040#[serde(default)]
1041pub struct ConnectionRecoveryConfig {
1042 pub enabled: bool,
1047 pub buffer_ttl_seconds: u64,
1049 pub max_buffer_size: usize,
1051}
1052
1053#[derive(Debug, Clone, Serialize, Deserialize)]
1054#[serde(default)]
1055pub struct HttpApiConfig {
1056 pub request_limit_in_mb: u32,
1057 pub accept_traffic: AcceptTraffic,
1058 pub usage_enabled: bool,
1059}
1060
1061#[derive(Debug, Clone, Serialize, Deserialize)]
1062#[serde(default)]
1063pub struct AcceptTraffic {
1064 pub memory_threshold: f64,
1065}
1066
1067#[derive(Debug, Clone, Serialize, Deserialize)]
1068#[serde(default)]
1069pub struct InstanceConfig {
1070 pub process_id: String,
1071}
1072
1073#[derive(Debug, Clone, Serialize, Deserialize)]
1074#[serde(default)]
1075pub struct MetricsConfig {
1076 pub enabled: bool,
1077 pub driver: MetricsDriver,
1078 pub host: String,
1079 pub prometheus: PrometheusConfig,
1080 pub port: u16,
1081}
1082
1083#[derive(Debug, Clone, Serialize, Deserialize)]
1084#[serde(default)]
1085pub struct PrometheusConfig {
1086 pub prefix: String,
1087}
1088
1089#[derive(Debug, Clone, Serialize, Deserialize)]
1090#[serde(default)]
1091pub struct LoggingConfig {
1092 pub colors_enabled: bool,
1093 pub include_target: bool,
1094}
1095
1096#[derive(Debug, Clone, Serialize, Deserialize)]
1097#[serde(default)]
1098pub struct PresenceConfig {
1099 pub max_members_per_channel: u32,
1100 pub max_member_size_in_kb: u32,
1101}
1102
1103#[derive(Debug, Clone, Serialize, Deserialize)]
1106#[serde(default)]
1107pub struct WebSocketConfig {
1108 pub max_messages: Option<usize>,
1109 pub max_bytes: Option<usize>,
1110 pub disconnect_on_buffer_full: bool,
1111 pub max_message_size: usize,
1112 pub max_frame_size: usize,
1113 pub write_buffer_size: usize,
1114 pub max_backpressure: usize,
1115 pub auto_ping: bool,
1116 pub ping_interval: u32,
1117 pub idle_timeout: u32,
1118 pub compression: String,
1119}
1120
1121impl Default for WebSocketConfig {
1122 fn default() -> Self {
1123 Self {
1124 max_messages: Some(1000),
1125 max_bytes: None,
1126 disconnect_on_buffer_full: true,
1127 max_message_size: 64 * 1024 * 1024,
1128 max_frame_size: 16 * 1024 * 1024,
1129 write_buffer_size: 16 * 1024,
1130 max_backpressure: 1024 * 1024,
1131 auto_ping: true,
1132 ping_interval: 30,
1133 idle_timeout: 120,
1134 compression: "disabled".to_string(),
1135 }
1136 }
1137}
1138
1139impl WebSocketConfig {
1140 pub fn to_buffer_config(&self) -> crate::websocket::WebSocketBufferConfig {
1142 use crate::websocket::{BufferLimit, WebSocketBufferConfig};
1143
1144 let limit = match (self.max_messages, self.max_bytes) {
1145 (Some(messages), Some(bytes)) => BufferLimit::Both { messages, bytes },
1146 (Some(messages), None) => BufferLimit::Messages(messages),
1147 (None, Some(bytes)) => BufferLimit::Bytes(bytes),
1148 (None, None) => BufferLimit::Messages(1000),
1149 };
1150
1151 WebSocketBufferConfig {
1152 limit,
1153 disconnect_on_full: self.disconnect_on_buffer_full,
1154 }
1155 }
1156
1157 pub fn to_sockudo_ws_config(
1159 &self,
1160 websocket_max_payload_kb: u32,
1161 activity_timeout: u64,
1162 ) -> sockudo_ws::Config {
1163 use sockudo_ws::Compression;
1164
1165 let compression = match self.compression.to_lowercase().as_str() {
1166 "dedicated" => Compression::Dedicated,
1167 "shared" => Compression::Shared,
1168 "window256b" => Compression::Window256B,
1169 "window1kb" => Compression::Window1KB,
1170 "window2kb" => Compression::Window2KB,
1171 "window4kb" => Compression::Window4KB,
1172 "window8kb" => Compression::Window8KB,
1173 "window16kb" => Compression::Window16KB,
1174 "window32kb" => Compression::Window32KB,
1175 _ => Compression::Disabled,
1176 };
1177
1178 sockudo_ws::Config::builder()
1179 .max_payload_length(
1180 self.max_bytes
1181 .unwrap_or(websocket_max_payload_kb as usize * 1024),
1182 )
1183 .max_message_size(self.max_message_size)
1184 .max_frame_size(self.max_frame_size)
1185 .write_buffer_size(self.write_buffer_size)
1186 .max_backpressure(self.max_backpressure)
1187 .idle_timeout(self.idle_timeout)
1188 .auto_ping(self.auto_ping)
1189 .ping_interval(self.ping_interval.max((activity_timeout / 2).max(5) as u32))
1190 .compression(compression)
1191 .build()
1192 }
1193}
1194
1195#[derive(Debug, Clone, Serialize, Deserialize, Default)]
1196#[serde(default)]
1197pub struct QueueConfig {
1198 pub driver: QueueDriver,
1199 pub redis: RedisQueueConfig,
1200 pub redis_cluster: RedisClusterQueueConfig,
1201 pub sqs: SqsQueueConfig,
1202 pub sns: SnsQueueConfig,
1203}
1204
1205#[derive(Debug, Clone, Serialize, Deserialize)]
1206#[serde(default)]
1207pub struct RedisQueueConfig {
1208 pub concurrency: u32,
1209 pub prefix: Option<String>,
1210 pub url_override: Option<String>,
1211 pub cluster_mode: bool,
1212}
1213
1214#[derive(Clone, Debug, Serialize, Deserialize)]
1215#[serde(default)]
1216pub struct RateLimit {
1217 pub max_requests: u32,
1218 pub window_seconds: u64,
1219 pub identifier: Option<String>,
1220 pub trust_hops: Option<u32>,
1221}
1222
1223#[derive(Debug, Clone, Serialize, Deserialize)]
1224#[serde(default)]
1225pub struct RateLimiterConfig {
1226 pub enabled: bool,
1227 pub driver: CacheDriver,
1228 pub api_rate_limit: RateLimit,
1229 pub websocket_rate_limit: RateLimit,
1230 pub redis: RedisConfig,
1231}
1232
1233#[derive(Debug, Clone, Serialize, Deserialize)]
1234#[serde(default)]
1235pub struct SslConfig {
1236 pub enabled: bool,
1237 pub cert_path: String,
1238 pub key_path: String,
1239 pub passphrase: Option<String>,
1240 pub ca_path: Option<String>,
1241 pub redirect_http: bool,
1242 pub http_port: Option<u16>,
1243}
1244
1245#[derive(Debug, Clone, Serialize, Deserialize)]
1246#[serde(default)]
1247pub struct WebhooksConfig {
1248 pub batching: BatchingConfig,
1249 pub retry: WebhookRetryConfig,
1250 pub request_timeout_ms: u64,
1251}
1252
1253#[derive(Debug, Clone, Serialize, Deserialize)]
1254#[serde(default)]
1255pub struct BatchingConfig {
1256 pub enabled: bool,
1257 pub duration: u64,
1258 pub size: usize,
1259}
1260
1261#[derive(Debug, Clone, Serialize, Deserialize)]
1262#[serde(default)]
1263pub struct WebhookRetryConfig {
1264 pub enabled: bool,
1265 pub max_attempts: Option<u32>,
1266 pub max_elapsed_time_ms: u64,
1267 pub initial_backoff_ms: u64,
1268 pub max_backoff_ms: u64,
1269}
1270
1271impl Default for WebhooksConfig {
1272 fn default() -> Self {
1273 Self {
1274 batching: BatchingConfig::default(),
1275 retry: WebhookRetryConfig::default(),
1276 request_timeout_ms: 10_000,
1277 }
1278 }
1279}
1280
1281impl Default for WebhookRetryConfig {
1282 fn default() -> Self {
1283 Self {
1284 enabled: true,
1285 max_attempts: None,
1286 max_elapsed_time_ms: 300_000,
1287 initial_backoff_ms: 1_000,
1288 max_backoff_ms: 60_000,
1289 }
1290 }
1291}
1292
1293#[derive(Debug, Clone, Serialize, Deserialize)]
1294#[serde(default)]
1295pub struct ClusterHealthConfig {
1296 pub enabled: bool,
1297 pub heartbeat_interval_ms: u64,
1298 pub node_timeout_ms: u64,
1299 pub cleanup_interval_ms: u64,
1300}
1301
1302#[derive(Debug, Clone, Serialize, Deserialize)]
1303#[serde(default)]
1304pub struct UnixSocketConfig {
1305 pub enabled: bool,
1306 pub path: String,
1307 #[serde(deserialize_with = "deserialize_octal_permission")]
1308 pub permission_mode: u32,
1309}
1310
1311#[derive(Debug, Clone, Serialize, Deserialize)]
1312#[serde(default)]
1313pub struct DeltaCompressionOptionsConfig {
1314 pub enabled: bool,
1315 pub algorithm: String,
1316 pub full_message_interval: u32,
1317 pub min_message_size: usize,
1318 pub max_state_age_secs: u64,
1319 pub max_channel_states_per_socket: usize,
1320 pub max_conflation_states_per_channel: Option<usize>,
1321 pub conflation_key_path: Option<String>,
1322 pub cluster_coordination: bool,
1323 pub omit_delta_algorithm: bool,
1324}
1325
1326#[derive(Debug, Clone, Serialize, Deserialize)]
1328#[serde(default)]
1329pub struct CleanupConfig {
1330 pub queue_buffer_size: usize,
1331 pub batch_size: usize,
1332 pub batch_timeout_ms: u64,
1333 pub worker_threads: WorkerThreadsConfig,
1334 pub max_retry_attempts: u32,
1335 pub async_enabled: bool,
1336 pub fallback_to_sync: bool,
1337}
1338
1339#[derive(Debug, Clone)]
1341pub enum WorkerThreadsConfig {
1342 Auto,
1343 Fixed(usize),
1344}
1345
1346impl serde::Serialize for WorkerThreadsConfig {
1347 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
1348 where
1349 S: serde::Serializer,
1350 {
1351 match self {
1352 WorkerThreadsConfig::Auto => serializer.serialize_str("auto"),
1353 WorkerThreadsConfig::Fixed(n) => serializer.serialize_u64(*n as u64),
1354 }
1355 }
1356}
1357
1358impl<'de> serde::Deserialize<'de> for WorkerThreadsConfig {
1359 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
1360 where
1361 D: serde::Deserializer<'de>,
1362 {
1363 use serde::de;
1364 struct WorkerThreadsVisitor;
1365 impl<'de> de::Visitor<'de> for WorkerThreadsVisitor {
1366 type Value = WorkerThreadsConfig;
1367
1368 fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
1369 formatter.write_str(r#""auto" or a positive integer"#)
1370 }
1371
1372 fn visit_str<E: de::Error>(self, value: &str) -> Result<Self::Value, E> {
1373 if value.eq_ignore_ascii_case("auto") {
1374 Ok(WorkerThreadsConfig::Auto)
1375 } else if let Ok(n) = value.parse::<usize>() {
1376 Ok(WorkerThreadsConfig::Fixed(n))
1377 } else {
1378 Err(E::custom(format!(
1379 "expected 'auto' or a number, got '{value}'"
1380 )))
1381 }
1382 }
1383
1384 fn visit_u64<E: de::Error>(self, value: u64) -> Result<Self::Value, E> {
1385 Ok(WorkerThreadsConfig::Fixed(value as usize))
1386 }
1387
1388 fn visit_i64<E: de::Error>(self, value: i64) -> Result<Self::Value, E> {
1389 if value >= 0 {
1390 Ok(WorkerThreadsConfig::Fixed(value as usize))
1391 } else {
1392 Err(E::custom("worker_threads must be non-negative"))
1393 }
1394 }
1395 }
1396 deserializer.deserialize_any(WorkerThreadsVisitor)
1397 }
1398}
1399
1400impl Default for CleanupConfig {
1401 fn default() -> Self {
1402 Self {
1403 queue_buffer_size: 1024,
1404 batch_size: 64,
1405 batch_timeout_ms: 100,
1406 worker_threads: WorkerThreadsConfig::Auto,
1407 max_retry_attempts: 3,
1408 async_enabled: true,
1409 fallback_to_sync: true,
1410 }
1411 }
1412}
1413
1414impl CleanupConfig {
1415 pub fn validate(&self) -> Result<(), String> {
1416 if self.queue_buffer_size == 0 {
1417 return Err("queue_buffer_size must be greater than 0".to_string());
1418 }
1419 if self.batch_size == 0 {
1420 return Err("batch_size must be greater than 0".to_string());
1421 }
1422 if self.batch_timeout_ms == 0 {
1423 return Err("batch_timeout_ms must be greater than 0".to_string());
1424 }
1425 if let WorkerThreadsConfig::Fixed(n) = self.worker_threads
1426 && n == 0
1427 {
1428 return Err("worker_threads must be greater than 0 when using fixed count".to_string());
1429 }
1430 Ok(())
1431 }
1432}
1433
1434#[derive(Debug, Clone, Serialize, Deserialize, Default)]
1435#[serde(default)]
1436pub struct TagFilteringConfig {
1437 #[serde(default)]
1438 pub enabled: bool,
1439 #[serde(default = "default_true")]
1440 pub enable_tags: bool,
1441}
1442
1443fn default_true() -> bool {
1444 true
1445}
1446
1447impl Default for ServerOptions {
1450 fn default() -> Self {
1451 Self {
1452 adapter: AdapterConfig::default(),
1453 app_manager: AppManagerConfig::default(),
1454 cache: CacheConfig::default(),
1455 channel_limits: ChannelLimits::default(),
1456 cors: CorsConfig::default(),
1457 database: DatabaseConfig::default(),
1458 database_pooling: DatabasePooling::default(),
1459 debug: false,
1460 tag_filtering: TagFilteringConfig::default(),
1461 event_limits: EventLimits::default(),
1462 host: "0.0.0.0".to_string(),
1463 http_api: HttpApiConfig::default(),
1464 instance: InstanceConfig::default(),
1465 logging: None,
1466 metrics: MetricsConfig::default(),
1467 mode: "production".to_string(),
1468 port: 6001,
1469 path_prefix: "/".to_string(),
1470 presence: PresenceConfig::default(),
1471 queue: QueueConfig::default(),
1472 rate_limiter: RateLimiterConfig::default(),
1473 shutdown_grace_period: 10,
1474 ssl: SslConfig::default(),
1475 user_authentication_timeout: 3600,
1476 webhooks: WebhooksConfig::default(),
1477 websocket_max_payload_kb: 64,
1478 cleanup: CleanupConfig::default(),
1479 activity_timeout: 120,
1480 cluster_health: ClusterHealthConfig::default(),
1481 unix_socket: UnixSocketConfig::default(),
1482 delta_compression: DeltaCompressionOptionsConfig::default(),
1483 websocket: WebSocketConfig::default(),
1484 connection_recovery: ConnectionRecoveryConfig::default(),
1485 idempotency: IdempotencyConfig::default(),
1486 ephemeral: EphemeralConfig::default(),
1487 echo_control: EchoControlConfig::default(),
1488 event_name_filtering: EventNameFilteringConfig::default(),
1489 }
1490 }
1491}
1492
1493impl Default for SqsQueueConfig {
1494 fn default() -> Self {
1495 Self {
1496 region: "us-east-1".to_string(),
1497 queue_url_prefix: None,
1498 visibility_timeout: 30,
1499 endpoint_url: None,
1500 max_messages: 10,
1501 wait_time_seconds: 5,
1502 concurrency: 5,
1503 fifo: false,
1504 message_group_id: Some("default".to_string()),
1505 }
1506 }
1507}
1508
1509impl Default for SnsQueueConfig {
1510 fn default() -> Self {
1511 Self {
1512 region: "us-east-1".to_string(),
1513 topic_arn: String::new(),
1514 endpoint_url: None,
1515 }
1516 }
1517}
1518
1519impl Default for RedisAdapterConfig {
1520 fn default() -> Self {
1521 Self {
1522 requests_timeout: 5000,
1523 prefix: "sockudo_adapter:".to_string(),
1524 redis_pub_options: AHashMap::new(),
1525 redis_sub_options: AHashMap::new(),
1526 cluster_mode: false,
1527 }
1528 }
1529}
1530
1531impl Default for RedisClusterAdapterConfig {
1532 fn default() -> Self {
1533 Self {
1534 nodes: vec![],
1535 prefix: "sockudo_adapter:".to_string(),
1536 request_timeout_ms: 1000,
1537 use_connection_manager: true,
1538 use_sharded_pubsub: false,
1539 }
1540 }
1541}
1542
1543impl Default for NatsAdapterConfig {
1544 fn default() -> Self {
1545 Self {
1546 servers: vec!["nats://localhost:4222".to_string()],
1547 prefix: "sockudo_adapter:".to_string(),
1548 request_timeout_ms: 5000,
1549 username: None,
1550 password: None,
1551 token: None,
1552 connection_timeout_ms: 5000,
1553 nodes_number: None,
1554 discovery_max_wait_ms: 1000,
1555 discovery_idle_wait_ms: 150,
1556 }
1557 }
1558}
1559
1560impl Default for PulsarAdapterConfig {
1561 fn default() -> Self {
1562 Self {
1563 url: "pulsar://127.0.0.1:6650".to_string(),
1564 prefix: "sockudo-adapter".to_string(),
1565 request_timeout_ms: 5000,
1566 token: None,
1567 nodes_number: None,
1568 }
1569 }
1570}
1571
1572impl Default for RabbitMqAdapterConfig {
1573 fn default() -> Self {
1574 Self {
1575 url: "amqp://guest:guest@127.0.0.1:5672/%2f".to_string(),
1576 prefix: "sockudo_adapter".to_string(),
1577 request_timeout_ms: 5000,
1578 connection_timeout_ms: 5000,
1579 nodes_number: None,
1580 }
1581 }
1582}
1583
1584impl Default for GooglePubSubAdapterConfig {
1585 fn default() -> Self {
1586 Self {
1587 project_id: "".to_string(),
1588 prefix: "sockudo-adapter".to_string(),
1589 request_timeout_ms: 5000,
1590 emulator_host: None,
1591 nodes_number: None,
1592 }
1593 }
1594}
1595
1596impl Default for KafkaAdapterConfig {
1597 fn default() -> Self {
1598 Self {
1599 brokers: vec!["localhost:9092".to_string()],
1600 prefix: "sockudo_adapter".to_string(),
1601 request_timeout_ms: 5000,
1602 security_protocol: None,
1603 sasl_mechanism: None,
1604 sasl_username: None,
1605 sasl_password: None,
1606 nodes_number: None,
1607 }
1608 }
1609}
1610
1611impl Default for CacheSettings {
1612 fn default() -> Self {
1613 Self {
1614 enabled: true,
1615 ttl: 300,
1616 }
1617 }
1618}
1619
1620impl Default for MemoryCacheOptions {
1621 fn default() -> Self {
1622 Self {
1623 ttl: 300,
1624 cleanup_interval: 60,
1625 max_capacity: 10000,
1626 }
1627 }
1628}
1629
1630impl Default for CacheConfig {
1631 fn default() -> Self {
1632 Self {
1633 driver: CacheDriver::default(),
1634 redis: RedisConfig {
1635 prefix: Some("sockudo_cache:".to_string()),
1636 url_override: None,
1637 cluster_mode: false,
1638 },
1639 memory: MemoryCacheOptions::default(),
1640 }
1641 }
1642}
1643
1644impl Default for ChannelLimits {
1645 fn default() -> Self {
1646 Self {
1647 max_name_length: 200,
1648 cache_ttl: 3600,
1649 }
1650 }
1651}
1652impl Default for CorsConfig {
1653 fn default() -> Self {
1654 Self {
1655 credentials: true,
1656 origin: vec!["*".to_string()],
1657 methods: vec!["GET".to_string(), "POST".to_string(), "OPTIONS".to_string()],
1658 allowed_headers: vec![
1659 "Authorization".to_string(),
1660 "Content-Type".to_string(),
1661 "X-Requested-With".to_string(),
1662 "Accept".to_string(),
1663 ],
1664 }
1665 }
1666}
1667
1668impl Default for DatabaseConnection {
1669 fn default() -> Self {
1670 Self {
1671 host: "localhost".to_string(),
1672 port: 3306,
1673 username: "root".to_string(),
1674 password: "".to_string(),
1675 database: "sockudo".to_string(),
1676 table_name: "applications".to_string(),
1677 connection_pool_size: 10,
1678 pool_min: None,
1679 pool_max: None,
1680 cache_ttl: 300,
1681 cache_cleanup_interval: 60,
1682 cache_max_capacity: 100,
1683 }
1684 }
1685}
1686
1687impl Default for RedisConnection {
1688 fn default() -> Self {
1689 Self {
1690 host: "127.0.0.1".to_string(),
1691 port: 6379,
1692 db: 0,
1693 username: None,
1694 password: None,
1695 key_prefix: "sockudo:".to_string(),
1696 sentinels: Vec::new(),
1697 sentinel_password: None,
1698 name: "mymaster".to_string(),
1699 cluster: RedisClusterConnection::default(),
1700 cluster_nodes: Vec::new(),
1701 }
1702 }
1703}
1704
1705impl Default for RedisSentinel {
1706 fn default() -> Self {
1707 Self {
1708 host: "localhost".to_string(),
1709 port: 26379,
1710 }
1711 }
1712}
1713
1714impl Default for ClusterNode {
1715 fn default() -> Self {
1716 Self {
1717 host: "127.0.0.1".to_string(),
1718 port: 7000,
1719 }
1720 }
1721}
1722
1723impl Default for DatabasePooling {
1724 fn default() -> Self {
1725 Self {
1726 enabled: true,
1727 min: 2,
1728 max: 10,
1729 }
1730 }
1731}
1732
1733impl Default for EventLimits {
1734 fn default() -> Self {
1735 Self {
1736 max_channels_at_once: 100,
1737 max_name_length: 200,
1738 max_payload_in_kb: 100,
1739 max_batch_size: 10,
1740 }
1741 }
1742}
1743
1744impl Default for IdempotencyConfig {
1745 fn default() -> Self {
1746 Self {
1747 enabled: true,
1748 ttl_seconds: 120,
1749 max_key_length: 128,
1750 }
1751 }
1752}
1753
1754impl Default for EphemeralConfig {
1755 fn default() -> Self {
1756 Self { enabled: true }
1757 }
1758}
1759
1760impl Default for EchoControlConfig {
1761 fn default() -> Self {
1762 Self {
1763 enabled: true,
1764 default_echo_messages: true,
1765 }
1766 }
1767}
1768
1769impl Default for EventNameFilteringConfig {
1770 fn default() -> Self {
1771 Self {
1772 enabled: true,
1773 max_events_per_filter: 50,
1774 max_event_name_length: 200,
1775 }
1776 }
1777}
1778
1779impl Default for ConnectionRecoveryConfig {
1780 fn default() -> Self {
1781 Self {
1782 enabled: false,
1783 buffer_ttl_seconds: 120,
1784 max_buffer_size: 100,
1785 }
1786 }
1787}
1788
1789impl Default for HttpApiConfig {
1790 fn default() -> Self {
1791 Self {
1792 request_limit_in_mb: 10,
1793 accept_traffic: AcceptTraffic::default(),
1794 usage_enabled: true,
1795 }
1796 }
1797}
1798
1799impl Default for AcceptTraffic {
1800 fn default() -> Self {
1801 Self {
1802 memory_threshold: 0.90,
1803 }
1804 }
1805}
1806
1807impl Default for InstanceConfig {
1808 fn default() -> Self {
1809 Self {
1810 process_id: uuid::Uuid::new_v4().to_string(),
1811 }
1812 }
1813}
1814
1815impl Default for LoggingConfig {
1816 fn default() -> Self {
1817 Self {
1818 colors_enabled: true,
1819 include_target: true,
1820 }
1821 }
1822}
1823
1824impl Default for MetricsConfig {
1825 fn default() -> Self {
1826 Self {
1827 enabled: true,
1828 driver: MetricsDriver::default(),
1829 host: "0.0.0.0".to_string(),
1830 prometheus: PrometheusConfig::default(),
1831 port: 9601,
1832 }
1833 }
1834}
1835
1836impl Default for PrometheusConfig {
1837 fn default() -> Self {
1838 Self {
1839 prefix: "sockudo_".to_string(),
1840 }
1841 }
1842}
1843
1844impl Default for PresenceConfig {
1845 fn default() -> Self {
1846 Self {
1847 max_members_per_channel: 100,
1848 max_member_size_in_kb: 2,
1849 }
1850 }
1851}
1852
1853impl Default for RedisQueueConfig {
1854 fn default() -> Self {
1855 Self {
1856 concurrency: 5,
1857 prefix: Some("sockudo_queue:".to_string()),
1858 url_override: None,
1859 cluster_mode: false,
1860 }
1861 }
1862}
1863
1864impl Default for RateLimit {
1865 fn default() -> Self {
1866 Self {
1867 max_requests: 60,
1868 window_seconds: 60,
1869 identifier: Some("default".to_string()),
1870 trust_hops: Some(0),
1871 }
1872 }
1873}
1874
1875impl Default for RateLimiterConfig {
1876 fn default() -> Self {
1877 Self {
1878 enabled: true,
1879 driver: CacheDriver::Memory,
1880 api_rate_limit: RateLimit {
1881 max_requests: 100,
1882 window_seconds: 60,
1883 identifier: Some("api".to_string()),
1884 trust_hops: Some(0),
1885 },
1886 websocket_rate_limit: RateLimit {
1887 max_requests: 20,
1888 window_seconds: 60,
1889 identifier: Some("websocket_connect".to_string()),
1890 trust_hops: Some(0),
1891 },
1892 redis: RedisConfig {
1893 prefix: Some("sockudo_rl:".to_string()),
1894 url_override: None,
1895 cluster_mode: false,
1896 },
1897 }
1898 }
1899}
1900
1901impl Default for SslConfig {
1902 fn default() -> Self {
1903 Self {
1904 enabled: false,
1905 cert_path: "".to_string(),
1906 key_path: "".to_string(),
1907 passphrase: None,
1908 ca_path: None,
1909 redirect_http: false,
1910 http_port: Some(80),
1911 }
1912 }
1913}
1914
1915impl Default for BatchingConfig {
1916 fn default() -> Self {
1917 Self {
1918 enabled: true,
1919 duration: 50,
1920 size: 100,
1921 }
1922 }
1923}
1924
1925impl Default for ClusterHealthConfig {
1926 fn default() -> Self {
1927 Self {
1928 enabled: true,
1929 heartbeat_interval_ms: 10000,
1930 node_timeout_ms: 30000,
1931 cleanup_interval_ms: 10000,
1932 }
1933 }
1934}
1935
1936impl Default for UnixSocketConfig {
1937 fn default() -> Self {
1938 Self {
1939 enabled: false,
1940 path: "/var/run/sockudo/sockudo.sock".to_string(),
1941 permission_mode: 0o660,
1942 }
1943 }
1944}
1945
1946impl Default for DeltaCompressionOptionsConfig {
1947 fn default() -> Self {
1948 Self {
1949 enabled: true,
1950 algorithm: "fossil".to_string(),
1951 full_message_interval: 10,
1952 min_message_size: 100,
1953 max_state_age_secs: 300,
1954 max_channel_states_per_socket: 100,
1955 max_conflation_states_per_channel: Some(100),
1956 conflation_key_path: None,
1957 cluster_coordination: false,
1958 omit_delta_algorithm: false,
1959 }
1960 }
1961}
1962
1963impl ClusterHealthConfig {
1964 pub fn validate(&self) -> Result<(), String> {
1965 if self.heartbeat_interval_ms == 0 {
1966 return Err("heartbeat_interval_ms must be greater than 0".to_string());
1967 }
1968 if self.node_timeout_ms == 0 {
1969 return Err("node_timeout_ms must be greater than 0".to_string());
1970 }
1971 if self.cleanup_interval_ms == 0 {
1972 return Err("cleanup_interval_ms must be greater than 0".to_string());
1973 }
1974
1975 if self.heartbeat_interval_ms > self.node_timeout_ms / 3 {
1976 return Err(format!(
1977 "heartbeat_interval_ms ({}) should be at least 3x smaller than node_timeout_ms ({}) to avoid false positive dead node detection. Recommended: heartbeat_interval_ms <= {}",
1978 self.heartbeat_interval_ms,
1979 self.node_timeout_ms,
1980 self.node_timeout_ms / 3
1981 ));
1982 }
1983
1984 if self.cleanup_interval_ms > self.node_timeout_ms {
1985 return Err(format!(
1986 "cleanup_interval_ms ({}) should not be larger than node_timeout_ms ({}) to ensure timely dead node detection",
1987 self.cleanup_interval_ms, self.node_timeout_ms
1988 ));
1989 }
1990
1991 Ok(())
1992 }
1993}
1994
1995impl ServerOptions {
1996 pub async fn load_from_file(path: &str) -> Result<Self, Box<dyn std::error::Error>> {
1997 let content = tokio::fs::read_to_string(path).await?;
1998 let options: Self = if path.ends_with(".toml") {
1999 toml::from_str(&content)?
2000 } else {
2001 sonic_rs::from_str(&content)?
2003 };
2004 Ok(options)
2005 }
2006
2007 pub async fn override_from_env(&mut self) -> Result<(), Box<dyn std::error::Error>> {
2008 if let Ok(mode) = std::env::var("ENVIRONMENT") {
2010 self.mode = mode;
2011 }
2012 self.debug = parse_bool_env("DEBUG_MODE", self.debug);
2013 if parse_bool_env("DEBUG", false) {
2014 self.debug = true;
2015 info!("DEBUG environment variable forces debug mode ON");
2016 }
2017
2018 self.activity_timeout = parse_env::<u64>("ACTIVITY_TIMEOUT", self.activity_timeout);
2019
2020 if let Ok(host) = std::env::var("HOST") {
2021 self.host = host;
2022 }
2023 self.port = parse_env::<u16>("PORT", self.port);
2024 self.shutdown_grace_period =
2025 parse_env::<u64>("SHUTDOWN_GRACE_PERIOD", self.shutdown_grace_period);
2026 self.user_authentication_timeout = parse_env::<u64>(
2027 "USER_AUTHENTICATION_TIMEOUT",
2028 self.user_authentication_timeout,
2029 );
2030 self.websocket_max_payload_kb =
2031 parse_env::<u32>("WEBSOCKET_MAX_PAYLOAD_KB", self.websocket_max_payload_kb);
2032 if let Ok(id) = std::env::var("INSTANCE_PROCESS_ID") {
2033 self.instance.process_id = id;
2034 }
2035
2036 if let Ok(driver_str) = std::env::var("ADAPTER_DRIVER") {
2038 self.adapter.driver =
2039 parse_driver_enum(driver_str, self.adapter.driver.clone(), "Adapter");
2040 }
2041 self.adapter.buffer_multiplier_per_cpu = parse_env::<usize>(
2042 "ADAPTER_BUFFER_MULTIPLIER_PER_CPU",
2043 self.adapter.buffer_multiplier_per_cpu,
2044 );
2045 self.adapter.enable_socket_counting = parse_env::<bool>(
2046 "ADAPTER_ENABLE_SOCKET_COUNTING",
2047 self.adapter.enable_socket_counting,
2048 );
2049 if let Ok(driver_str) = std::env::var("CACHE_DRIVER") {
2050 self.cache.driver = parse_driver_enum(driver_str, self.cache.driver.clone(), "Cache");
2051 }
2052 if let Ok(driver_str) = std::env::var("QUEUE_DRIVER") {
2053 self.queue.driver = parse_driver_enum(driver_str, self.queue.driver.clone(), "Queue");
2054 }
2055 if let Ok(driver_str) = std::env::var("APP_MANAGER_DRIVER") {
2056 self.app_manager.driver =
2057 parse_driver_enum(driver_str, self.app_manager.driver.clone(), "AppManager");
2058 }
2059 if let Ok(driver_str) = std::env::var("RATE_LIMITER_DRIVER") {
2060 self.rate_limiter.driver = parse_driver_enum(
2061 driver_str,
2062 self.rate_limiter.driver.clone(),
2063 "RateLimiter Backend",
2064 );
2065 }
2066
2067 if let Ok(host) = std::env::var("DATABASE_REDIS_HOST") {
2069 self.database.redis.host = host;
2070 }
2071 self.database.redis.port =
2072 parse_env::<u16>("DATABASE_REDIS_PORT", self.database.redis.port);
2073 if let Ok(username) = std::env::var("DATABASE_REDIS_USERNAME") {
2074 self.database.redis.username = if username.is_empty() {
2075 None
2076 } else {
2077 Some(username)
2078 };
2079 }
2080 if let Ok(password) = std::env::var("DATABASE_REDIS_PASSWORD") {
2081 self.database.redis.password = Some(password);
2082 }
2083 self.database.redis.db = parse_env::<u32>("DATABASE_REDIS_DB", self.database.redis.db);
2084 if let Ok(prefix) = std::env::var("DATABASE_REDIS_KEY_PREFIX") {
2085 self.database.redis.key_prefix = prefix;
2086 }
2087 if let Ok(cluster_username) = std::env::var("DATABASE_REDIS_CLUSTER_USERNAME") {
2088 self.database.redis.cluster.username = if cluster_username.is_empty() {
2089 None
2090 } else {
2091 Some(cluster_username)
2092 };
2093 }
2094 if let Ok(cluster_password) = std::env::var("DATABASE_REDIS_CLUSTER_PASSWORD") {
2095 self.database.redis.cluster.password = Some(cluster_password);
2096 }
2097 self.database.redis.cluster.use_tls = parse_bool_env(
2098 "DATABASE_REDIS_CLUSTER_USE_TLS",
2099 self.database.redis.cluster.use_tls,
2100 );
2101
2102 if let Ok(host) = std::env::var("DATABASE_MYSQL_HOST") {
2104 self.database.mysql.host = host;
2105 }
2106 self.database.mysql.port =
2107 parse_env::<u16>("DATABASE_MYSQL_PORT", self.database.mysql.port);
2108 if let Ok(user) = std::env::var("DATABASE_MYSQL_USERNAME") {
2109 self.database.mysql.username = user;
2110 }
2111 if let Ok(pass) = std::env::var("DATABASE_MYSQL_PASSWORD") {
2112 self.database.mysql.password = pass;
2113 }
2114 if let Ok(db) = std::env::var("DATABASE_MYSQL_DATABASE") {
2115 self.database.mysql.database = db;
2116 }
2117 if let Ok(table) = std::env::var("DATABASE_MYSQL_TABLE_NAME") {
2118 self.database.mysql.table_name = table;
2119 }
2120 override_db_pool_settings(&mut self.database.mysql, "DATABASE_MYSQL");
2121
2122 if let Ok(host) = std::env::var("DATABASE_POSTGRES_HOST") {
2124 self.database.postgres.host = host;
2125 }
2126 self.database.postgres.port =
2127 parse_env::<u16>("DATABASE_POSTGRES_PORT", self.database.postgres.port);
2128 if let Ok(user) = std::env::var("DATABASE_POSTGRES_USERNAME") {
2129 self.database.postgres.username = user;
2130 }
2131 if let Ok(pass) = std::env::var("DATABASE_POSTGRES_PASSWORD") {
2132 self.database.postgres.password = pass;
2133 }
2134 if let Ok(db) = std::env::var("DATABASE_POSTGRES_DATABASE") {
2135 self.database.postgres.database = db;
2136 }
2137 override_db_pool_settings(&mut self.database.postgres, "DATABASE_POSTGRES");
2138
2139 if let Ok(region) = std::env::var("DATABASE_DYNAMODB_REGION") {
2141 self.database.dynamodb.region = region;
2142 }
2143 if let Ok(table) = std::env::var("DATABASE_DYNAMODB_TABLE_NAME") {
2144 self.database.dynamodb.table_name = table;
2145 }
2146 if let Ok(endpoint) = std::env::var("DATABASE_DYNAMODB_ENDPOINT_URL") {
2147 self.database.dynamodb.endpoint_url = Some(endpoint);
2148 }
2149 if let Ok(key_id) = std::env::var("AWS_ACCESS_KEY_ID") {
2150 self.database.dynamodb.aws_access_key_id = Some(key_id);
2151 }
2152 if let Ok(secret) = std::env::var("AWS_SECRET_ACCESS_KEY") {
2153 self.database.dynamodb.aws_secret_access_key = Some(secret);
2154 }
2155
2156 if let Ok(url) = std::env::var("DATABASE_SURREALDB_URL") {
2158 self.database.surrealdb.url = url;
2159 }
2160 if let Ok(namespace) = std::env::var("DATABASE_SURREALDB_NAMESPACE") {
2161 self.database.surrealdb.namespace = namespace;
2162 }
2163 if let Ok(database) = std::env::var("DATABASE_SURREALDB_DATABASE") {
2164 self.database.surrealdb.database = database;
2165 }
2166 if let Ok(username) = std::env::var("DATABASE_SURREALDB_USERNAME") {
2167 self.database.surrealdb.username = username;
2168 }
2169 if let Ok(password) = std::env::var("DATABASE_SURREALDB_PASSWORD") {
2170 self.database.surrealdb.password = password;
2171 }
2172 if let Ok(table) = std::env::var("DATABASE_SURREALDB_TABLE_NAME") {
2173 self.database.surrealdb.table_name = table;
2174 }
2175
2176 let apply_redis_cluster_nodes = |options: &mut Self, nodes: &str| {
2178 let node_list: Vec<String> = nodes
2179 .split(',')
2180 .map(|s| s.trim())
2181 .filter(|s| !s.is_empty())
2182 .map(ToString::to_string)
2183 .collect();
2184
2185 options.adapter.cluster.nodes = node_list.clone();
2186 options.queue.redis_cluster.nodes = node_list.clone();
2187
2188 let parsed_nodes: Vec<ClusterNode> = node_list
2189 .iter()
2190 .filter_map(|seed| ClusterNode::from_seed(seed))
2191 .collect();
2192 options.database.redis.cluster.nodes = parsed_nodes.clone();
2193 options.database.redis.cluster_nodes = parsed_nodes;
2194 };
2195
2196 if let Ok(nodes) = std::env::var("REDIS_CLUSTER_NODES") {
2197 apply_redis_cluster_nodes(self, &nodes);
2198 }
2199 if let Ok(nodes) = std::env::var("DATABASE_REDIS_CLUSTER_NODES") {
2200 apply_redis_cluster_nodes(self, &nodes);
2201 }
2202 self.queue.redis_cluster.concurrency = parse_env::<u32>(
2203 "REDIS_CLUSTER_QUEUE_CONCURRENCY",
2204 self.queue.redis_cluster.concurrency,
2205 );
2206 if let Ok(prefix) = std::env::var("REDIS_CLUSTER_QUEUE_PREFIX") {
2207 self.queue.redis_cluster.prefix = Some(prefix);
2208 }
2209
2210 self.ssl.enabled = parse_bool_env("SSL_ENABLED", self.ssl.enabled);
2212 if let Ok(val) = std::env::var("SSL_CERT_PATH") {
2213 self.ssl.cert_path = val;
2214 }
2215 if let Ok(val) = std::env::var("SSL_KEY_PATH") {
2216 self.ssl.key_path = val;
2217 }
2218 self.ssl.redirect_http = parse_bool_env("SSL_REDIRECT_HTTP", self.ssl.redirect_http);
2219 if let Some(port) = parse_env_optional::<u16>("SSL_HTTP_PORT") {
2220 self.ssl.http_port = Some(port);
2221 }
2222
2223 self.unix_socket.enabled = parse_bool_env("UNIX_SOCKET_ENABLED", self.unix_socket.enabled);
2225 if let Ok(path) = std::env::var("UNIX_SOCKET_PATH") {
2226 self.unix_socket.path = path;
2227 }
2228 if let Ok(mode_str) = std::env::var("UNIX_SOCKET_PERMISSION_MODE") {
2229 if mode_str.chars().all(|c| c.is_digit(8)) {
2230 if let Ok(mode) = u32::from_str_radix(&mode_str, 8) {
2231 if mode <= 0o777 {
2232 self.unix_socket.permission_mode = mode;
2233 } else {
2234 warn!(
2235 "UNIX_SOCKET_PERMISSION_MODE '{}' exceeds maximum value 777. Using default: {:o}",
2236 mode_str, self.unix_socket.permission_mode
2237 );
2238 }
2239 } else {
2240 warn!(
2241 "Failed to parse UNIX_SOCKET_PERMISSION_MODE '{}' as octal. Using default: {:o}",
2242 mode_str, self.unix_socket.permission_mode
2243 );
2244 }
2245 } else {
2246 warn!(
2247 "UNIX_SOCKET_PERMISSION_MODE '{}' must contain only octal digits (0-7). Using default: {:o}",
2248 mode_str, self.unix_socket.permission_mode
2249 );
2250 }
2251 }
2252
2253 if let Ok(driver_str) = std::env::var("METRICS_DRIVER") {
2255 self.metrics.driver =
2256 parse_driver_enum(driver_str, self.metrics.driver.clone(), "Metrics");
2257 }
2258 self.metrics.enabled = parse_bool_env("METRICS_ENABLED", self.metrics.enabled);
2259 if let Ok(val) = std::env::var("METRICS_HOST") {
2260 self.metrics.host = val;
2261 }
2262 self.metrics.port = parse_env::<u16>("METRICS_PORT", self.metrics.port);
2263 if let Ok(val) = std::env::var("METRICS_PROMETHEUS_PREFIX") {
2264 self.metrics.prometheus.prefix = val;
2265 }
2266
2267 self.http_api.usage_enabled =
2269 parse_bool_env("HTTP_API_USAGE_ENABLED", self.http_api.usage_enabled);
2270
2271 self.rate_limiter.enabled =
2273 parse_bool_env("RATE_LIMITER_ENABLED", self.rate_limiter.enabled);
2274 self.rate_limiter.api_rate_limit.max_requests = parse_env::<u32>(
2275 "RATE_LIMITER_API_MAX_REQUESTS",
2276 self.rate_limiter.api_rate_limit.max_requests,
2277 );
2278 self.rate_limiter.api_rate_limit.window_seconds = parse_env::<u64>(
2279 "RATE_LIMITER_API_WINDOW_SECONDS",
2280 self.rate_limiter.api_rate_limit.window_seconds,
2281 );
2282 if let Some(hops) = parse_env_optional::<u32>("RATE_LIMITER_API_TRUST_HOPS") {
2283 self.rate_limiter.api_rate_limit.trust_hops = Some(hops);
2284 }
2285 self.rate_limiter.websocket_rate_limit.max_requests = parse_env::<u32>(
2286 "RATE_LIMITER_WS_MAX_REQUESTS",
2287 self.rate_limiter.websocket_rate_limit.max_requests,
2288 );
2289 self.rate_limiter.websocket_rate_limit.window_seconds = parse_env::<u64>(
2290 "RATE_LIMITER_WS_WINDOW_SECONDS",
2291 self.rate_limiter.websocket_rate_limit.window_seconds,
2292 );
2293 if let Ok(prefix) = std::env::var("RATE_LIMITER_REDIS_PREFIX") {
2294 self.rate_limiter.redis.prefix = Some(prefix);
2295 }
2296
2297 self.queue.redis.concurrency =
2299 parse_env::<u32>("QUEUE_REDIS_CONCURRENCY", self.queue.redis.concurrency);
2300 if let Ok(prefix) = std::env::var("QUEUE_REDIS_PREFIX") {
2301 self.queue.redis.prefix = Some(prefix);
2302 }
2303
2304 if let Ok(region) = std::env::var("QUEUE_SQS_REGION") {
2306 self.queue.sqs.region = region;
2307 }
2308 self.queue.sqs.visibility_timeout = parse_env::<i32>(
2309 "QUEUE_SQS_VISIBILITY_TIMEOUT",
2310 self.queue.sqs.visibility_timeout,
2311 );
2312 self.queue.sqs.max_messages =
2313 parse_env::<i32>("QUEUE_SQS_MAX_MESSAGES", self.queue.sqs.max_messages);
2314 self.queue.sqs.wait_time_seconds = parse_env::<i32>(
2315 "QUEUE_SQS_WAIT_TIME_SECONDS",
2316 self.queue.sqs.wait_time_seconds,
2317 );
2318 self.queue.sqs.concurrency =
2319 parse_env::<u32>("QUEUE_SQS_CONCURRENCY", self.queue.sqs.concurrency);
2320 self.queue.sqs.fifo = parse_bool_env("QUEUE_SQS_FIFO", self.queue.sqs.fifo);
2321 if let Ok(endpoint) = std::env::var("QUEUE_SQS_ENDPOINT_URL") {
2322 self.queue.sqs.endpoint_url = Some(endpoint);
2323 }
2324
2325 if let Ok(region) = std::env::var("QUEUE_SNS_REGION") {
2327 self.queue.sns.region = region;
2328 }
2329 if let Ok(topic_arn) = std::env::var("QUEUE_SNS_TOPIC_ARN") {
2330 self.queue.sns.topic_arn = topic_arn;
2331 }
2332 if let Ok(endpoint) = std::env::var("QUEUE_SNS_ENDPOINT_URL") {
2333 self.queue.sns.endpoint_url = Some(endpoint);
2334 }
2335
2336 self.webhooks.batching.enabled =
2338 parse_bool_env("WEBHOOK_BATCHING_ENABLED", self.webhooks.batching.enabled);
2339 self.webhooks.batching.duration =
2340 parse_env::<u64>("WEBHOOK_BATCHING_DURATION", self.webhooks.batching.duration);
2341 self.webhooks.batching.size =
2342 parse_env::<usize>("WEBHOOK_BATCHING_SIZE", self.webhooks.batching.size);
2343
2344 if let Ok(servers) = std::env::var("NATS_SERVERS") {
2346 self.adapter.nats.servers = servers.split(',').map(|s| s.trim().to_string()).collect();
2347 }
2348 if let Ok(user) = std::env::var("NATS_USERNAME") {
2349 self.adapter.nats.username = Some(user);
2350 }
2351 if let Ok(pass) = std::env::var("NATS_PASSWORD") {
2352 self.adapter.nats.password = Some(pass);
2353 }
2354 if let Ok(token) = std::env::var("NATS_TOKEN") {
2355 self.adapter.nats.token = Some(token);
2356 }
2357 if let Ok(prefix) = std::env::var("NATS_PREFIX") {
2358 self.adapter.nats.prefix = prefix;
2359 }
2360 self.adapter.nats.connection_timeout_ms = parse_env::<u64>(
2361 "NATS_CONNECTION_TIMEOUT_MS",
2362 self.adapter.nats.connection_timeout_ms,
2363 );
2364 self.adapter.nats.request_timeout_ms = parse_env::<u64>(
2365 "NATS_REQUEST_TIMEOUT_MS",
2366 self.adapter.nats.request_timeout_ms,
2367 );
2368 self.adapter.nats.discovery_max_wait_ms = parse_env::<u64>(
2369 "NATS_DISCOVERY_MAX_WAIT_MS",
2370 self.adapter.nats.discovery_max_wait_ms,
2371 );
2372 self.adapter.nats.discovery_idle_wait_ms = parse_env::<u64>(
2373 "NATS_DISCOVERY_IDLE_WAIT_MS",
2374 self.adapter.nats.discovery_idle_wait_ms,
2375 );
2376 if let Some(nodes) = parse_env_optional::<u32>("NATS_NODES_NUMBER") {
2377 self.adapter.nats.nodes_number = Some(nodes);
2378 }
2379
2380 if let Ok(url) = std::env::var("PULSAR_URL") {
2382 self.adapter.pulsar.url = url;
2383 }
2384 if let Ok(prefix) = std::env::var("PULSAR_PREFIX") {
2385 self.adapter.pulsar.prefix = prefix;
2386 }
2387 if let Ok(token) = std::env::var("PULSAR_TOKEN") {
2388 self.adapter.pulsar.token = Some(token);
2389 }
2390 self.adapter.pulsar.request_timeout_ms = parse_env::<u64>(
2391 "PULSAR_REQUEST_TIMEOUT_MS",
2392 self.adapter.pulsar.request_timeout_ms,
2393 );
2394 if let Some(nodes) = parse_env_optional::<u32>("PULSAR_NODES_NUMBER") {
2395 self.adapter.pulsar.nodes_number = Some(nodes);
2396 }
2397
2398 if let Ok(url) = std::env::var("RABBITMQ_URL") {
2400 self.adapter.rabbitmq.url = url;
2401 }
2402 if let Ok(prefix) = std::env::var("RABBITMQ_PREFIX") {
2403 self.adapter.rabbitmq.prefix = prefix;
2404 }
2405 self.adapter.rabbitmq.connection_timeout_ms = parse_env::<u64>(
2406 "RABBITMQ_CONNECTION_TIMEOUT_MS",
2407 self.adapter.rabbitmq.connection_timeout_ms,
2408 );
2409 self.adapter.rabbitmq.request_timeout_ms = parse_env::<u64>(
2410 "RABBITMQ_REQUEST_TIMEOUT_MS",
2411 self.adapter.rabbitmq.request_timeout_ms,
2412 );
2413 if let Some(nodes) = parse_env_optional::<u32>("RABBITMQ_NODES_NUMBER") {
2414 self.adapter.rabbitmq.nodes_number = Some(nodes);
2415 }
2416
2417 if let Ok(project_id) = std::env::var("GOOGLE_PUBSUB_PROJECT_ID") {
2419 self.adapter.google_pubsub.project_id = project_id;
2420 }
2421 if let Ok(prefix) = std::env::var("GOOGLE_PUBSUB_PREFIX") {
2422 self.adapter.google_pubsub.prefix = prefix;
2423 }
2424 if let Ok(emulator_host) = std::env::var("PUBSUB_EMULATOR_HOST") {
2425 self.adapter.google_pubsub.emulator_host = Some(emulator_host);
2426 }
2427 self.adapter.google_pubsub.request_timeout_ms = parse_env::<u64>(
2428 "GOOGLE_PUBSUB_REQUEST_TIMEOUT_MS",
2429 self.adapter.google_pubsub.request_timeout_ms,
2430 );
2431 if let Some(nodes) = parse_env_optional::<u32>("GOOGLE_PUBSUB_NODES_NUMBER") {
2432 self.adapter.google_pubsub.nodes_number = Some(nodes);
2433 }
2434
2435 if let Ok(brokers) = std::env::var("KAFKA_BROKERS") {
2437 self.adapter.kafka.brokers = brokers.split(',').map(|s| s.trim().to_string()).collect();
2438 }
2439 if let Ok(prefix) = std::env::var("KAFKA_PREFIX") {
2440 self.adapter.kafka.prefix = prefix;
2441 }
2442 if let Ok(protocol) = std::env::var("KAFKA_SECURITY_PROTOCOL") {
2443 self.adapter.kafka.security_protocol = Some(protocol);
2444 }
2445 if let Ok(mechanism) = std::env::var("KAFKA_SASL_MECHANISM") {
2446 self.adapter.kafka.sasl_mechanism = Some(mechanism);
2447 }
2448 if let Ok(username) = std::env::var("KAFKA_SASL_USERNAME") {
2449 self.adapter.kafka.sasl_username = Some(username);
2450 }
2451 if let Ok(password) = std::env::var("KAFKA_SASL_PASSWORD") {
2452 self.adapter.kafka.sasl_password = Some(password);
2453 }
2454 self.adapter.kafka.request_timeout_ms = parse_env::<u64>(
2455 "KAFKA_REQUEST_TIMEOUT_MS",
2456 self.adapter.kafka.request_timeout_ms,
2457 );
2458 if let Some(nodes) = parse_env_optional::<u32>("KAFKA_NODES_NUMBER") {
2459 self.adapter.kafka.nodes_number = Some(nodes);
2460 }
2461
2462 if let Ok(origins) = std::env::var("CORS_ORIGINS") {
2464 let parsed: Vec<String> = origins.split(',').map(|s| s.trim().to_string()).collect();
2465 if let Err(e) = crate::origin_validation::OriginValidator::validate_patterns(&parsed) {
2466 warn!(
2467 "CORS_ORIGINS contains invalid patterns: {}. Keeping previous CORS origins.",
2468 e
2469 );
2470 } else {
2471 self.cors.origin = parsed;
2472 }
2473 }
2474 if let Ok(methods) = std::env::var("CORS_METHODS") {
2475 self.cors.methods = methods.split(',').map(|s| s.trim().to_string()).collect();
2476 }
2477 if let Ok(headers) = std::env::var("CORS_HEADERS") {
2478 self.cors.allowed_headers = headers.split(',').map(|s| s.trim().to_string()).collect();
2479 }
2480 self.cors.credentials = parse_bool_env("CORS_CREDENTIALS", self.cors.credentials);
2481
2482 self.database_pooling.enabled =
2484 parse_bool_env("DATABASE_POOLING_ENABLED", self.database_pooling.enabled);
2485 if let Some(min) = parse_env_optional::<u32>("DATABASE_POOL_MIN") {
2486 self.database_pooling.min = min;
2487 }
2488 if let Some(max) = parse_env_optional::<u32>("DATABASE_POOL_MAX") {
2489 self.database_pooling.max = max;
2490 }
2491
2492 if let Some(pool_size) = parse_env_optional::<u32>("DATABASE_CONNECTION_POOL_SIZE") {
2493 self.database.mysql.connection_pool_size = pool_size;
2494 self.database.postgres.connection_pool_size = pool_size;
2495 }
2496 if let Some(cache_ttl) = parse_env_optional::<u64>("CACHE_TTL_SECONDS") {
2497 self.app_manager.cache.ttl = cache_ttl;
2498 self.channel_limits.cache_ttl = cache_ttl;
2499 self.database.mysql.cache_ttl = cache_ttl;
2500 self.database.postgres.cache_ttl = cache_ttl;
2501 self.database.surrealdb.cache_ttl = cache_ttl;
2502 self.cache.memory.ttl = cache_ttl;
2503 }
2504 if let Some(cleanup_interval) = parse_env_optional::<u64>("CACHE_CLEANUP_INTERVAL") {
2505 self.database.mysql.cache_cleanup_interval = cleanup_interval;
2506 self.database.postgres.cache_cleanup_interval = cleanup_interval;
2507 self.cache.memory.cleanup_interval = cleanup_interval;
2508 }
2509 if let Some(max_capacity) = parse_env_optional::<u64>("CACHE_MAX_CAPACITY") {
2510 self.database.mysql.cache_max_capacity = max_capacity;
2511 self.database.postgres.cache_max_capacity = max_capacity;
2512 self.database.surrealdb.cache_max_capacity = max_capacity;
2513 self.cache.memory.max_capacity = max_capacity;
2514 }
2515
2516 let default_app_id = std::env::var("SOCKUDO_DEFAULT_APP_ID");
2517 let default_app_key = std::env::var("SOCKUDO_DEFAULT_APP_KEY");
2518 let default_app_secret = std::env::var("SOCKUDO_DEFAULT_APP_SECRET");
2519 let default_app_enabled = parse_bool_env("SOCKUDO_DEFAULT_APP_ENABLED", true);
2520
2521 if let (Ok(app_id), Ok(app_key), Ok(app_secret)) =
2522 (default_app_id, default_app_key, default_app_secret)
2523 && default_app_enabled
2524 {
2525 let default_app = App::from_policy(
2526 app_id,
2527 app_key,
2528 app_secret,
2529 default_app_enabled,
2530 crate::app::AppPolicy {
2531 limits: crate::app::AppLimitsPolicy {
2532 max_connections: parse_env::<u32>(
2533 "SOCKUDO_DEFAULT_APP_MAX_CONNECTIONS",
2534 100,
2535 ),
2536 max_backend_events_per_second: Some(parse_env::<u32>(
2537 "SOCKUDO_DEFAULT_APP_MAX_BACKEND_EVENTS_PER_SECOND",
2538 100,
2539 )),
2540 max_client_events_per_second: parse_env::<u32>(
2541 "SOCKUDO_DEFAULT_APP_MAX_CLIENT_EVENTS_PER_SECOND",
2542 100,
2543 ),
2544 max_read_requests_per_second: Some(parse_env::<u32>(
2545 "SOCKUDO_DEFAULT_APP_MAX_READ_REQUESTS_PER_SECOND",
2546 100,
2547 )),
2548 max_presence_members_per_channel: Some(parse_env::<u32>(
2549 "SOCKUDO_DEFAULT_APP_MAX_PRESENCE_MEMBERS_PER_CHANNEL",
2550 100,
2551 )),
2552 max_presence_member_size_in_kb: Some(parse_env::<u32>(
2553 "SOCKUDO_DEFAULT_APP_MAX_PRESENCE_MEMBER_SIZE_IN_KB",
2554 100,
2555 )),
2556 max_channel_name_length: Some(parse_env::<u32>(
2557 "SOCKUDO_DEFAULT_APP_MAX_CHANNEL_NAME_LENGTH",
2558 100,
2559 )),
2560 max_event_channels_at_once: Some(parse_env::<u32>(
2561 "SOCKUDO_DEFAULT_APP_MAX_EVENT_CHANNELS_AT_ONCE",
2562 100,
2563 )),
2564 max_event_name_length: Some(parse_env::<u32>(
2565 "SOCKUDO_DEFAULT_APP_MAX_EVENT_NAME_LENGTH",
2566 100,
2567 )),
2568 max_event_payload_in_kb: Some(parse_env::<u32>(
2569 "SOCKUDO_DEFAULT_APP_MAX_EVENT_PAYLOAD_IN_KB",
2570 100,
2571 )),
2572 max_event_batch_size: Some(parse_env::<u32>(
2573 "SOCKUDO_DEFAULT_APP_MAX_EVENT_BATCH_SIZE",
2574 100,
2575 )),
2576 decay_seconds: None,
2577 terminate_on_limit: false,
2578 message_rate_limit: None,
2579 },
2580 features: crate::app::AppFeaturesPolicy {
2581 enable_client_messages: std::env::var(
2582 "SOCKUDO_DEFAULT_APP_ENABLE_CLIENT_MESSAGES",
2583 )
2584 .ok()
2585 .map(|value| {
2586 matches!(
2587 value.trim().to_ascii_lowercase().as_str(),
2588 "true" | "1" | "yes" | "on"
2589 )
2590 })
2591 .unwrap_or_else(|| parse_bool_env("SOCKUDO_ENABLE_CLIENT_MESSAGES", false)),
2592 enable_user_authentication: Some(parse_bool_env(
2593 "SOCKUDO_DEFAULT_APP_ENABLE_USER_AUTHENTICATION",
2594 false,
2595 )),
2596 enable_watchlist_events: Some(parse_bool_env(
2597 "SOCKUDO_DEFAULT_APP_ENABLE_WATCHLIST_EVENTS",
2598 false,
2599 )),
2600 },
2601 channels: crate::app::AppChannelsPolicy {
2602 allowed_origins: {
2603 if let Ok(origins_str) =
2604 std::env::var("SOCKUDO_DEFAULT_APP_ALLOWED_ORIGINS")
2605 {
2606 if !origins_str.is_empty() {
2607 Some(
2608 origins_str
2609 .split(',')
2610 .map(|s| s.trim().to_string())
2611 .collect(),
2612 )
2613 } else {
2614 None
2615 }
2616 } else {
2617 None
2618 }
2619 },
2620 channel_delta_compression: None,
2621 channel_namespaces: None,
2622 },
2623 webhooks: None,
2624 idempotency: None,
2625 connection_recovery: None,
2626 },
2627 );
2628
2629 self.app_manager.array.apps.push(default_app);
2630 info!("Successfully registered default app from env");
2631 }
2632
2633 if let Ok(redis_url_env) = std::env::var("REDIS_URL") {
2635 info!("Applying REDIS_URL environment variable override");
2636
2637 let redis_url_json = sonic_rs::json!(redis_url_env);
2638
2639 self.adapter
2640 .redis
2641 .redis_pub_options
2642 .insert("url".to_string(), redis_url_json.clone());
2643 self.adapter
2644 .redis
2645 .redis_sub_options
2646 .insert("url".to_string(), redis_url_json);
2647
2648 self.cache.redis.url_override = Some(redis_url_env.clone());
2649 self.queue.redis.url_override = Some(redis_url_env.clone());
2650 self.rate_limiter.redis.url_override = Some(redis_url_env);
2651 }
2652
2653 let has_colors_env = std::env::var("LOG_COLORS_ENABLED").is_ok();
2655 let has_target_env = std::env::var("LOG_INCLUDE_TARGET").is_ok();
2656 if has_colors_env || has_target_env {
2657 let logging_config = self.logging.get_or_insert_with(Default::default);
2658 if has_colors_env {
2659 logging_config.colors_enabled =
2660 parse_bool_env("LOG_COLORS_ENABLED", logging_config.colors_enabled);
2661 }
2662 if has_target_env {
2663 logging_config.include_target =
2664 parse_bool_env("LOG_INCLUDE_TARGET", logging_config.include_target);
2665 }
2666 }
2667
2668 self.cleanup.async_enabled =
2670 parse_bool_env("CLEANUP_ASYNC_ENABLED", self.cleanup.async_enabled);
2671 self.cleanup.fallback_to_sync =
2672 parse_bool_env("CLEANUP_FALLBACK_TO_SYNC", self.cleanup.fallback_to_sync);
2673 self.cleanup.queue_buffer_size =
2674 parse_env::<usize>("CLEANUP_QUEUE_BUFFER_SIZE", self.cleanup.queue_buffer_size);
2675 self.cleanup.batch_size = parse_env::<usize>("CLEANUP_BATCH_SIZE", self.cleanup.batch_size);
2676 self.cleanup.batch_timeout_ms =
2677 parse_env::<u64>("CLEANUP_BATCH_TIMEOUT_MS", self.cleanup.batch_timeout_ms);
2678 if let Ok(worker_threads_str) = std::env::var("CLEANUP_WORKER_THREADS") {
2679 self.cleanup.worker_threads = if worker_threads_str.to_lowercase() == "auto" {
2680 WorkerThreadsConfig::Auto
2681 } else if let Ok(n) = worker_threads_str.parse::<usize>() {
2682 WorkerThreadsConfig::Fixed(n)
2683 } else {
2684 warn!(
2685 "Invalid CLEANUP_WORKER_THREADS value '{}', keeping current setting",
2686 worker_threads_str
2687 );
2688 self.cleanup.worker_threads.clone()
2689 };
2690 }
2691 self.cleanup.max_retry_attempts = parse_env::<u32>(
2692 "CLEANUP_MAX_RETRY_ATTEMPTS",
2693 self.cleanup.max_retry_attempts,
2694 );
2695
2696 self.cluster_health.enabled =
2698 parse_bool_env("CLUSTER_HEALTH_ENABLED", self.cluster_health.enabled);
2699 self.cluster_health.heartbeat_interval_ms = parse_env::<u64>(
2700 "CLUSTER_HEALTH_HEARTBEAT_INTERVAL",
2701 self.cluster_health.heartbeat_interval_ms,
2702 );
2703 self.cluster_health.node_timeout_ms = parse_env::<u64>(
2704 "CLUSTER_HEALTH_NODE_TIMEOUT",
2705 self.cluster_health.node_timeout_ms,
2706 );
2707 self.cluster_health.cleanup_interval_ms = parse_env::<u64>(
2708 "CLUSTER_HEALTH_CLEANUP_INTERVAL",
2709 self.cluster_health.cleanup_interval_ms,
2710 );
2711
2712 self.tag_filtering.enabled =
2714 parse_bool_env("TAG_FILTERING_ENABLED", self.tag_filtering.enabled);
2715
2716 if let Ok(val) = std::env::var("WEBSOCKET_MAX_MESSAGES") {
2718 if val.to_lowercase() == "none" || val == "0" {
2719 self.websocket.max_messages = None;
2720 } else if let Ok(n) = val.parse::<usize>() {
2721 self.websocket.max_messages = Some(n);
2722 }
2723 }
2724 if let Ok(val) = std::env::var("WEBSOCKET_MAX_BYTES") {
2725 if val.to_lowercase() == "none" || val == "0" {
2726 self.websocket.max_bytes = None;
2727 } else if let Ok(n) = val.parse::<usize>() {
2728 self.websocket.max_bytes = Some(n);
2729 }
2730 }
2731 self.websocket.disconnect_on_buffer_full = parse_bool_env(
2732 "WEBSOCKET_DISCONNECT_ON_BUFFER_FULL",
2733 self.websocket.disconnect_on_buffer_full,
2734 );
2735 self.websocket.max_message_size = parse_env::<usize>(
2736 "WEBSOCKET_MAX_MESSAGE_SIZE",
2737 self.websocket.max_message_size,
2738 );
2739 self.websocket.max_frame_size =
2740 parse_env::<usize>("WEBSOCKET_MAX_FRAME_SIZE", self.websocket.max_frame_size);
2741 self.websocket.write_buffer_size = parse_env::<usize>(
2742 "WEBSOCKET_WRITE_BUFFER_SIZE",
2743 self.websocket.write_buffer_size,
2744 );
2745 self.websocket.max_backpressure = parse_env::<usize>(
2746 "WEBSOCKET_MAX_BACKPRESSURE",
2747 self.websocket.max_backpressure,
2748 );
2749 self.websocket.auto_ping = parse_bool_env("WEBSOCKET_AUTO_PING", self.websocket.auto_ping);
2750 self.websocket.ping_interval =
2751 parse_env::<u32>("WEBSOCKET_PING_INTERVAL", self.websocket.ping_interval);
2752 self.websocket.idle_timeout =
2753 parse_env::<u32>("WEBSOCKET_IDLE_TIMEOUT", self.websocket.idle_timeout);
2754 if let Ok(mode) = std::env::var("WEBSOCKET_COMPRESSION") {
2755 self.websocket.compression = mode;
2756 }
2757
2758 self.connection_recovery.enabled = parse_bool_env(
2760 "CONNECTION_RECOVERY_ENABLED",
2761 self.connection_recovery.enabled,
2762 );
2763 self.connection_recovery.buffer_ttl_seconds = parse_env::<u64>(
2764 "CONNECTION_RECOVERY_BUFFER_TTL",
2765 self.connection_recovery.buffer_ttl_seconds,
2766 );
2767 self.connection_recovery.max_buffer_size = parse_env::<usize>(
2768 "CONNECTION_RECOVERY_MAX_BUFFER_SIZE",
2769 self.connection_recovery.max_buffer_size,
2770 );
2771
2772 self.idempotency.enabled = parse_bool_env("IDEMPOTENCY_ENABLED", self.idempotency.enabled);
2773 self.idempotency.ttl_seconds =
2774 parse_env::<u64>("IDEMPOTENCY_TTL_SECONDS", self.idempotency.ttl_seconds);
2775 self.idempotency.max_key_length = parse_env::<usize>(
2776 "IDEMPOTENCY_MAX_KEY_LENGTH",
2777 self.idempotency.max_key_length,
2778 );
2779
2780 self.ephemeral.enabled = parse_bool_env("EPHEMERAL_ENABLED", self.ephemeral.enabled);
2781
2782 self.echo_control.enabled =
2783 parse_bool_env("ECHO_CONTROL_ENABLED", self.echo_control.enabled);
2784 self.echo_control.default_echo_messages = parse_bool_env(
2785 "ECHO_CONTROL_DEFAULT_ECHO_MESSAGES",
2786 self.echo_control.default_echo_messages,
2787 );
2788
2789 self.event_name_filtering.enabled = parse_bool_env(
2790 "EVENT_NAME_FILTERING_ENABLED",
2791 self.event_name_filtering.enabled,
2792 );
2793 self.event_name_filtering.max_events_per_filter = parse_env::<usize>(
2794 "EVENT_NAME_FILTERING_MAX_EVENTS_PER_FILTER",
2795 self.event_name_filtering.max_events_per_filter,
2796 );
2797 self.event_name_filtering.max_event_name_length = parse_env::<usize>(
2798 "EVENT_NAME_FILTERING_MAX_EVENT_NAME_LENGTH",
2799 self.event_name_filtering.max_event_name_length,
2800 );
2801
2802 Ok(())
2803 }
2804
2805 pub fn validate(&self) -> Result<(), String> {
2806 if self.unix_socket.enabled {
2807 if self.unix_socket.path.is_empty() {
2808 return Err(
2809 "Unix socket path cannot be empty when Unix socket is enabled".to_string(),
2810 );
2811 }
2812
2813 self.validate_unix_socket_security()?;
2814
2815 if self.ssl.enabled {
2816 tracing::warn!(
2817 "Both Unix socket and SSL are enabled. This is unusual as Unix sockets are typically used behind reverse proxies that handle SSL termination."
2818 );
2819 }
2820
2821 if self.unix_socket.permission_mode > 0o777 {
2822 return Err(format!(
2823 "Unix socket permission_mode ({:o}) is invalid. Must be a valid octal mode (0o000 to 0o777)",
2824 self.unix_socket.permission_mode
2825 ));
2826 }
2827 }
2828
2829 if let Err(e) = self.cleanup.validate() {
2830 return Err(format!("Invalid cleanup configuration: {}", e));
2831 }
2832
2833 Ok(())
2834 }
2835
2836 fn validate_unix_socket_security(&self) -> Result<(), String> {
2837 let path = &self.unix_socket.path;
2838
2839 if path.contains("../") || path.contains("..\\") {
2840 return Err(
2841 "Unix socket path contains directory traversal sequences (../). This is not allowed for security reasons.".to_string()
2842 );
2843 }
2844
2845 if self.unix_socket.permission_mode & 0o002 != 0 {
2846 warn!(
2847 "Unix socket permission mode ({:o}) allows world write access. This may be a security risk. Consider using more restrictive permissions like 0o660 or 0o750.",
2848 self.unix_socket.permission_mode
2849 );
2850 }
2851
2852 if self.unix_socket.permission_mode & 0o007 > 0o005 {
2853 warn!(
2854 "Unix socket permission mode ({:o}) grants write permissions to others. Consider using more restrictive permissions.",
2855 self.unix_socket.permission_mode
2856 );
2857 }
2858
2859 if self.mode == "production" && path.starts_with("/tmp/") {
2860 warn!(
2861 "Unix socket path '{}' is in /tmp directory. In production, consider using a more permanent location like /var/run/sockudo/ for better security and persistence.",
2862 path
2863 );
2864 }
2865
2866 if !path.starts_with('/') {
2867 return Err(
2868 "Unix socket path must be absolute (start with /) for security and reliability."
2869 .to_string(),
2870 );
2871 }
2872
2873 Ok(())
2874 }
2875}
2876
2877#[cfg(test)]
2878mod redis_connection_tests {
2879 use super::{ClusterNode, RedisClusterConnection, RedisConnection, RedisSentinel};
2880
2881 #[test]
2882 fn test_standard_url_basic() {
2883 let conn = RedisConnection {
2884 host: "127.0.0.1".to_string(),
2885 port: 6379,
2886 db: 0,
2887 username: None,
2888 password: None,
2889 key_prefix: "sockudo:".to_string(),
2890 sentinels: Vec::new(),
2891 sentinel_password: None,
2892 name: "mymaster".to_string(),
2893 cluster: RedisClusterConnection::default(),
2894 cluster_nodes: Vec::new(),
2895 };
2896 assert_eq!(conn.to_url(), "redis://127.0.0.1:6379/0");
2897 }
2898
2899 #[test]
2900 fn test_standard_url_with_password() {
2901 let conn = RedisConnection {
2902 host: "127.0.0.1".to_string(),
2903 port: 6379,
2904 db: 2,
2905 username: None,
2906 password: Some("secret".to_string()),
2907 key_prefix: "sockudo:".to_string(),
2908 sentinels: Vec::new(),
2909 sentinel_password: None,
2910 name: "mymaster".to_string(),
2911 cluster: RedisClusterConnection::default(),
2912 cluster_nodes: Vec::new(),
2913 };
2914 assert_eq!(conn.to_url(), "redis://:secret@127.0.0.1:6379/2");
2915 }
2916
2917 #[test]
2918 fn test_standard_url_with_username_and_password() {
2919 let conn = RedisConnection {
2920 host: "redis.example.com".to_string(),
2921 port: 6380,
2922 db: 1,
2923 username: Some("admin".to_string()),
2924 password: Some("pass123".to_string()),
2925 key_prefix: "sockudo:".to_string(),
2926 sentinels: Vec::new(),
2927 sentinel_password: None,
2928 name: "mymaster".to_string(),
2929 cluster: RedisClusterConnection::default(),
2930 cluster_nodes: Vec::new(),
2931 };
2932 assert_eq!(
2933 conn.to_url(),
2934 "redis://admin:pass123@redis.example.com:6380/1"
2935 );
2936 }
2937
2938 #[test]
2939 fn test_standard_url_with_special_chars_in_password() {
2940 let conn = RedisConnection {
2941 host: "127.0.0.1".to_string(),
2942 port: 6379,
2943 db: 0,
2944 username: None,
2945 password: Some("pass@word#123".to_string()),
2946 key_prefix: "sockudo:".to_string(),
2947 sentinels: Vec::new(),
2948 sentinel_password: None,
2949 name: "mymaster".to_string(),
2950 cluster: RedisClusterConnection::default(),
2951 cluster_nodes: Vec::new(),
2952 };
2953 assert_eq!(conn.to_url(), "redis://:pass%40word%23123@127.0.0.1:6379/0");
2954 }
2955
2956 #[test]
2957 fn test_is_sentinel_configured_false() {
2958 let conn = RedisConnection::default();
2959 assert!(!conn.is_sentinel_configured());
2960 }
2961
2962 #[test]
2963 fn test_is_sentinel_configured_true() {
2964 let conn = RedisConnection {
2965 sentinels: vec![RedisSentinel {
2966 host: "sentinel1".to_string(),
2967 port: 26379,
2968 }],
2969 ..Default::default()
2970 };
2971 assert!(conn.is_sentinel_configured());
2972 }
2973
2974 #[test]
2975 fn test_sentinel_url_basic() {
2976 let conn = RedisConnection {
2977 host: "127.0.0.1".to_string(),
2978 port: 6379,
2979 db: 0,
2980 username: None,
2981 password: None,
2982 key_prefix: "sockudo:".to_string(),
2983 sentinels: vec![
2984 RedisSentinel {
2985 host: "sentinel1".to_string(),
2986 port: 26379,
2987 },
2988 RedisSentinel {
2989 host: "sentinel2".to_string(),
2990 port: 26379,
2991 },
2992 ],
2993 sentinel_password: None,
2994 name: "mymaster".to_string(),
2995 cluster: RedisClusterConnection::default(),
2996 cluster_nodes: Vec::new(),
2997 };
2998 assert_eq!(
2999 conn.to_url(),
3000 "redis+sentinel://sentinel1:26379,sentinel2:26379/mymaster/0"
3001 );
3002 }
3003
3004 #[test]
3005 fn test_sentinel_url_with_sentinel_password() {
3006 let conn = RedisConnection {
3007 host: "127.0.0.1".to_string(),
3008 port: 6379,
3009 db: 0,
3010 username: None,
3011 password: None,
3012 key_prefix: "sockudo:".to_string(),
3013 sentinels: vec![RedisSentinel {
3014 host: "sentinel1".to_string(),
3015 port: 26379,
3016 }],
3017 sentinel_password: Some("sentinelpass".to_string()),
3018 name: "mymaster".to_string(),
3019 cluster: RedisClusterConnection::default(),
3020 cluster_nodes: Vec::new(),
3021 };
3022 assert_eq!(
3023 conn.to_url(),
3024 "redis+sentinel://:sentinelpass@sentinel1:26379/mymaster/0"
3025 );
3026 }
3027
3028 #[test]
3029 fn test_sentinel_url_with_master_password() {
3030 let conn = RedisConnection {
3031 host: "127.0.0.1".to_string(),
3032 port: 6379,
3033 db: 1,
3034 username: None,
3035 password: Some("masterpass".to_string()),
3036 key_prefix: "sockudo:".to_string(),
3037 sentinels: vec![RedisSentinel {
3038 host: "sentinel1".to_string(),
3039 port: 26379,
3040 }],
3041 sentinel_password: None,
3042 name: "mymaster".to_string(),
3043 cluster: RedisClusterConnection::default(),
3044 cluster_nodes: Vec::new(),
3045 };
3046 assert_eq!(
3047 conn.to_url(),
3048 "redis+sentinel://sentinel1:26379/mymaster/1?password=masterpass"
3049 );
3050 }
3051
3052 #[test]
3053 fn test_sentinel_url_with_all_auth() {
3054 let conn = RedisConnection {
3055 host: "127.0.0.1".to_string(),
3056 port: 6379,
3057 db: 2,
3058 username: Some("redisuser".to_string()),
3059 password: Some("redispass".to_string()),
3060 key_prefix: "sockudo:".to_string(),
3061 sentinels: vec![
3062 RedisSentinel {
3063 host: "sentinel1".to_string(),
3064 port: 26379,
3065 },
3066 RedisSentinel {
3067 host: "sentinel2".to_string(),
3068 port: 26380,
3069 },
3070 ],
3071 sentinel_password: Some("sentinelauth".to_string()),
3072 name: "production-master".to_string(),
3073 cluster: RedisClusterConnection::default(),
3074 cluster_nodes: Vec::new(),
3075 };
3076 assert_eq!(
3077 conn.to_url(),
3078 "redis+sentinel://:sentinelauth@sentinel1:26379,sentinel2:26380/production-master/2?password=redispass&username=redisuser"
3079 );
3080 }
3081
3082 #[test]
3083 fn test_sentinel_to_host_port() {
3084 let sentinel = RedisSentinel {
3085 host: "sentinel.example.com".to_string(),
3086 port: 26379,
3087 };
3088 assert_eq!(sentinel.to_host_port(), "sentinel.example.com:26379");
3089 }
3090
3091 #[test]
3092 fn test_cluster_node_urls_with_shared_cluster_auth_and_tls() {
3093 let conn = RedisConnection {
3094 cluster: RedisClusterConnection {
3095 nodes: vec![
3096 ClusterNode {
3097 host: "node1.secure-cluster.com".to_string(),
3098 port: 7000,
3099 },
3100 ClusterNode {
3101 host: "redis://node2.secure-cluster.com:7001".to_string(),
3102 port: 7001,
3103 },
3104 ClusterNode {
3105 host: "rediss://node3.secure-cluster.com".to_string(),
3106 port: 7002,
3107 },
3108 ],
3109 username: None,
3110 password: Some("cluster-secret".to_string()),
3111 use_tls: true,
3112 },
3113 ..Default::default()
3114 };
3115
3116 assert_eq!(
3117 conn.cluster_node_urls(),
3118 vec![
3119 "rediss://:cluster-secret@node1.secure-cluster.com:7000",
3120 "redis://:cluster-secret@node2.secure-cluster.com:7001",
3121 "rediss://:cluster-secret@node3.secure-cluster.com:7002",
3122 ]
3123 );
3124 }
3125
3126 #[test]
3127 fn test_cluster_node_urls_fallback_to_legacy_nodes() {
3128 let conn = RedisConnection {
3129 password: Some("fallback-secret".to_string()),
3130 cluster_nodes: vec![ClusterNode {
3131 host: "legacy-node.example.com".to_string(),
3132 port: 7000,
3133 }],
3134 ..Default::default()
3135 };
3136
3137 assert_eq!(
3138 conn.cluster_node_urls(),
3139 vec!["redis://:fallback-secret@legacy-node.example.com:7000"]
3140 );
3141 }
3142
3143 #[test]
3144 fn test_normalize_cluster_seed_urls() {
3145 let conn = RedisConnection {
3146 cluster: RedisClusterConnection {
3147 nodes: Vec::new(),
3148 username: Some("svc-user".to_string()),
3149 password: Some("svc-pass".to_string()),
3150 use_tls: true,
3151 },
3152 ..Default::default()
3153 };
3154
3155 let seeds = vec![
3156 "node1.example.com:7000".to_string(),
3157 "redis://node2.example.com:7001".to_string(),
3158 "rediss://node3.example.com".to_string(),
3159 ];
3160
3161 assert_eq!(
3162 conn.normalize_cluster_seed_urls(&seeds),
3163 vec![
3164 "rediss://svc-user:svc-pass@node1.example.com:7000",
3165 "redis://svc-user:svc-pass@node2.example.com:7001",
3166 "rediss://svc-user:svc-pass@node3.example.com:6379",
3167 ]
3168 );
3169 }
3170}
3171
3172#[cfg(test)]
3173mod cluster_node_tests {
3174 use super::ClusterNode;
3175
3176 #[test]
3177 fn test_to_url_basic_host() {
3178 let node = ClusterNode {
3179 host: "localhost".to_string(),
3180 port: 6379,
3181 };
3182 assert_eq!(node.to_url(), "redis://localhost:6379");
3183 }
3184
3185 #[test]
3186 fn test_to_url_ip_address() {
3187 let node = ClusterNode {
3188 host: "127.0.0.1".to_string(),
3189 port: 6379,
3190 };
3191 assert_eq!(node.to_url(), "redis://127.0.0.1:6379");
3192 }
3193
3194 #[test]
3195 fn test_to_url_with_redis_protocol() {
3196 let node = ClusterNode {
3197 host: "redis://example.com".to_string(),
3198 port: 6379,
3199 };
3200 assert_eq!(node.to_url(), "redis://example.com:6379");
3201 }
3202
3203 #[test]
3204 fn test_to_url_with_rediss_protocol() {
3205 let node = ClusterNode {
3206 host: "rediss://secure.example.com".to_string(),
3207 port: 6379,
3208 };
3209 assert_eq!(node.to_url(), "rediss://secure.example.com:6379");
3210 }
3211
3212 #[test]
3213 fn test_to_url_with_rediss_protocol_and_port_in_url() {
3214 let node = ClusterNode {
3215 host: "rediss://secure.example.com:7000".to_string(),
3216 port: 6379,
3217 };
3218 assert_eq!(node.to_url(), "rediss://secure.example.com:7000");
3219 }
3220
3221 #[test]
3222 fn test_to_url_with_redis_protocol_and_port_in_url() {
3223 let node = ClusterNode {
3224 host: "redis://example.com:7001".to_string(),
3225 port: 6379,
3226 };
3227 assert_eq!(node.to_url(), "redis://example.com:7001");
3228 }
3229
3230 #[test]
3231 fn test_to_url_with_trailing_whitespace() {
3232 let node = ClusterNode {
3233 host: " rediss://secure.example.com ".to_string(),
3234 port: 6379,
3235 };
3236 assert_eq!(node.to_url(), "rediss://secure.example.com:6379");
3237 }
3238
3239 #[test]
3240 fn test_to_url_custom_port() {
3241 let node = ClusterNode {
3242 host: "redis-cluster.example.com".to_string(),
3243 port: 7000,
3244 };
3245 assert_eq!(node.to_url(), "redis://redis-cluster.example.com:7000");
3246 }
3247
3248 #[test]
3249 fn test_to_url_plain_host_with_port_in_host_field() {
3250 let node = ClusterNode {
3251 host: "redis-cluster.example.com:7010".to_string(),
3252 port: 7000,
3253 };
3254 assert_eq!(node.to_url(), "redis://redis-cluster.example.com:7010");
3255 }
3256
3257 #[test]
3258 fn test_to_url_with_options_adds_auth_and_tls() {
3259 let node = ClusterNode {
3260 host: "node.example.com".to_string(),
3261 port: 7000,
3262 };
3263 assert_eq!(
3264 node.to_url_with_options(true, Some("svc-user"), Some("secret")),
3265 "rediss://svc-user:secret@node.example.com:7000"
3266 );
3267 }
3268
3269 #[test]
3270 fn test_to_url_with_options_keeps_embedded_auth() {
3271 let node = ClusterNode {
3272 host: "rediss://:node-secret@node.example.com:7000".to_string(),
3273 port: 7000,
3274 };
3275 assert_eq!(
3276 node.to_url_with_options(true, Some("global-user"), Some("global-secret")),
3277 "rediss://:node-secret@node.example.com:7000"
3278 );
3279 }
3280
3281 #[test]
3282 fn test_from_seed_parses_plain_host_port() {
3283 let node = ClusterNode::from_seed("cluster-node-1:7005").expect("node should parse");
3284 assert_eq!(node.host, "cluster-node-1");
3285 assert_eq!(node.port, 7005);
3286 }
3287
3288 #[test]
3289 fn test_from_seed_keeps_scheme_urls() {
3290 let node =
3291 ClusterNode::from_seed("rediss://secure.example.com:7005").expect("node should parse");
3292 assert_eq!(node.host, "rediss://secure.example.com:7005");
3293 assert_eq!(node.port, 7005);
3294 }
3295
3296 #[test]
3297 fn test_to_url_aws_elasticache_hostname() {
3298 let node = ClusterNode {
3299 host: "rediss://my-cluster.use1.cache.amazonaws.com".to_string(),
3300 port: 6379,
3301 };
3302 assert_eq!(
3303 node.to_url(),
3304 "rediss://my-cluster.use1.cache.amazonaws.com:6379"
3305 );
3306 }
3307
3308 #[test]
3309 fn test_to_url_with_ipv6_no_port() {
3310 let node = ClusterNode {
3311 host: "rediss://[::1]".to_string(),
3312 port: 6379,
3313 };
3314 assert_eq!(node.to_url(), "rediss://[::1]:6379");
3315 }
3316
3317 #[test]
3318 fn test_to_url_with_ipv6_and_port_in_url() {
3319 let node = ClusterNode {
3320 host: "rediss://[::1]:7000".to_string(),
3321 port: 6379,
3322 };
3323 assert_eq!(node.to_url(), "rediss://[::1]:7000");
3324 }
3325
3326 #[test]
3327 fn test_to_url_with_ipv6_full_address_no_port() {
3328 let node = ClusterNode {
3329 host: "rediss://[2001:db8::1]".to_string(),
3330 port: 6379,
3331 };
3332 assert_eq!(node.to_url(), "rediss://[2001:db8::1]:6379");
3333 }
3334
3335 #[test]
3336 fn test_to_url_with_ipv6_full_address_with_port() {
3337 let node = ClusterNode {
3338 host: "rediss://[2001:db8::1]:7000".to_string(),
3339 port: 6379,
3340 };
3341 assert_eq!(node.to_url(), "rediss://[2001:db8::1]:7000");
3342 }
3343
3344 #[test]
3345 fn test_to_url_with_redis_protocol_ipv6() {
3346 let node = ClusterNode {
3347 host: "redis://[::1]".to_string(),
3348 port: 6379,
3349 };
3350 assert_eq!(node.to_url(), "redis://[::1]:6379");
3351 }
3352}
3353
3354#[cfg(test)]
3355mod cors_config_tests {
3356 use super::CorsConfig;
3357
3358 fn cors_from_json(json: &str) -> sonic_rs::Result<CorsConfig> {
3359 sonic_rs::from_str(json)
3360 }
3361
3362 #[test]
3363 fn test_deserialize_valid_exact_origins() {
3364 let config =
3365 cors_from_json(r#"{"origin": ["https://example.com", "https://other.com"]}"#).unwrap();
3366 assert_eq!(config.origin.len(), 2);
3367 }
3368
3369 #[test]
3370 fn test_deserialize_valid_wildcard_patterns() {
3371 let config =
3372 cors_from_json(r#"{"origin": ["*.example.com", "https://*.staging.example.com"]}"#)
3373 .unwrap();
3374 assert_eq!(config.origin.len(), 2);
3375 }
3376
3377 #[test]
3378 fn test_deserialize_allows_special_markers() {
3379 assert!(cors_from_json(r#"{"origin": ["*"]}"#).is_ok());
3380 assert!(cors_from_json(r#"{"origin": ["Any"]}"#).is_ok());
3381 assert!(cors_from_json(r#"{"origin": ["any"]}"#).is_ok());
3382 assert!(cors_from_json(r#"{"origin": ["*", "https://example.com"]}"#).is_ok());
3383 }
3384
3385 #[test]
3386 fn test_deserialize_rejects_invalid_patterns() {
3387 assert!(cors_from_json(r#"{"origin": ["*.*bad"]}"#).is_err());
3388 assert!(cors_from_json(r#"{"origin": ["*."]}"#).is_err());
3389 assert!(cors_from_json(r#"{"origin": [""]}"#).is_err());
3390 assert!(cors_from_json(r#"{"origin": ["https://"]}"#).is_err());
3391 }
3392
3393 #[test]
3394 fn test_deserialize_rejects_mixed_valid_and_invalid() {
3395 assert!(cors_from_json(r#"{"origin": ["https://good.com", "*.*bad"]}"#).is_err());
3396 }
3397}