1use crate::app::App;
2use crate::utils::{parse_bool_env, parse_env, parse_env_optional};
3use ahash::AHashMap;
4use serde::{Deserialize, Serialize};
5use std::str::FromStr;
6use tracing::{info, warn};
7use url::Url;
8
9fn deserialize_octal_permission<'de, D>(deserializer: D) -> Result<u32, D::Error>
11where
12 D: serde::Deserializer<'de>,
13{
14 use serde::de::{self};
15
16 let s = String::deserialize(deserializer)?;
17
18 if !s.chars().all(|c| c.is_digit(8)) {
20 return Err(de::Error::custom(format!(
21 "invalid octal permission mode '{}': must contain only digits 0-7",
22 s
23 )));
24 }
25
26 let mode = u32::from_str_radix(&s, 8)
28 .map_err(|_| de::Error::custom(format!("invalid octal permission mode: {}", s)))?;
29
30 if mode > 0o777 {
32 return Err(de::Error::custom(format!(
33 "permission mode '{}' exceeds maximum value 777",
34 s
35 )));
36 }
37
38 Ok(mode)
39}
40
41fn parse_driver_enum<T: FromStr + Clone + std::fmt::Debug>(
43 driver_str: String,
44 default_driver: T,
45 driver_name: &str,
46) -> T
47where
48 <T as FromStr>::Err: std::fmt::Debug,
49{
50 match T::from_str(&driver_str.to_lowercase()) {
51 Ok(driver_enum) => driver_enum,
52 Err(e) => {
53 warn!(
54 "Failed to parse {} driver from string '{}': {:?}. Using default: {:?}.",
55 driver_name, driver_str, e, default_driver
56 );
57 default_driver
58 }
59 }
60}
61
62fn override_db_pool_settings(db_conn: &mut DatabaseConnection, prefix: &str) {
63 if let Some(min) = parse_env_optional::<u32>(&format!("{}_POOL_MIN", prefix)) {
64 db_conn.pool_min = Some(min);
65 }
66 if let Some(max) = parse_env_optional::<u32>(&format!("{}_POOL_MAX", prefix)) {
67 db_conn.pool_max = Some(max);
68 }
69}
70
71#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
74#[serde(rename_all = "lowercase")]
75pub enum AdapterDriver {
76 #[default]
77 Local,
78 Redis,
79 #[serde(rename = "redis-cluster")]
80 RedisCluster,
81 Nats,
82}
83
84#[derive(Debug, Clone, Serialize, Deserialize)]
85#[serde(default)]
86pub struct DynamoDbSettings {
87 pub region: String,
88 pub table_name: String,
89 pub endpoint_url: Option<String>,
90 pub aws_access_key_id: Option<String>,
91 pub aws_secret_access_key: Option<String>,
92 pub aws_profile_name: Option<String>,
93}
94
95impl Default for DynamoDbSettings {
96 fn default() -> Self {
97 Self {
98 region: "us-east-1".to_string(),
99 table_name: "sockudo-applications".to_string(),
100 endpoint_url: None,
101 aws_access_key_id: None,
102 aws_secret_access_key: None,
103 aws_profile_name: None,
104 }
105 }
106}
107
108#[derive(Debug, Clone, Serialize, Deserialize)]
109#[serde(default)]
110pub struct ScyllaDbSettings {
111 pub nodes: Vec<String>,
112 pub keyspace: String,
113 pub table_name: String,
114 pub username: Option<String>,
115 pub password: Option<String>,
116 pub replication_class: String,
117 pub replication_factor: u32,
118}
119
120impl Default for ScyllaDbSettings {
121 fn default() -> Self {
122 Self {
123 nodes: vec!["127.0.0.1:9042".to_string()],
124 keyspace: "sockudo".to_string(),
125 table_name: "applications".to_string(),
126 username: None,
127 password: None,
128 replication_class: "SimpleStrategy".to_string(),
129 replication_factor: 3,
130 }
131 }
132}
133
134impl FromStr for AdapterDriver {
135 type Err = String;
136 fn from_str(s: &str) -> Result<Self, Self::Err> {
137 match s.to_lowercase().as_str() {
138 "local" => Ok(AdapterDriver::Local),
139 "redis" => Ok(AdapterDriver::Redis),
140 "redis-cluster" => Ok(AdapterDriver::RedisCluster),
141 "nats" => Ok(AdapterDriver::Nats),
142 _ => Err(format!("Unknown adapter driver: {s}")),
143 }
144 }
145}
146
147#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
148#[serde(rename_all = "lowercase")]
149pub enum AppManagerDriver {
150 #[default]
151 Memory,
152 Mysql,
153 Dynamodb,
154 PgSql,
155 ScyllaDb,
156}
157impl FromStr for AppManagerDriver {
158 type Err = String;
159 fn from_str(s: &str) -> Result<Self, Self::Err> {
160 match s.to_lowercase().as_str() {
161 "memory" => Ok(AppManagerDriver::Memory),
162 "mysql" => Ok(AppManagerDriver::Mysql),
163 "dynamodb" => Ok(AppManagerDriver::Dynamodb),
164 "pgsql" | "postgres" | "postgresql" => Ok(AppManagerDriver::PgSql),
165 "scylladb" | "scylla" => Ok(AppManagerDriver::ScyllaDb),
166 _ => Err(format!("Unknown app manager driver: {s}")),
167 }
168 }
169}
170
171#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
172#[serde(rename_all = "lowercase")]
173pub enum CacheDriver {
174 #[default]
175 Memory,
176 Redis,
177 #[serde(rename = "redis-cluster")]
178 RedisCluster,
179 None,
180}
181
182impl FromStr for CacheDriver {
183 type Err = String;
184 fn from_str(s: &str) -> Result<Self, Self::Err> {
185 match s.to_lowercase().as_str() {
186 "memory" => Ok(CacheDriver::Memory),
187 "redis" => Ok(CacheDriver::Redis),
188 "redis-cluster" => Ok(CacheDriver::RedisCluster),
189 "none" => Ok(CacheDriver::None),
190 _ => Err(format!("Unknown cache driver: {s}")),
191 }
192 }
193}
194
195#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
196#[serde(rename_all = "lowercase")]
197pub enum QueueDriver {
198 Memory,
199 #[default]
200 Redis,
201 #[serde(rename = "redis-cluster")]
202 RedisCluster,
203 Sqs,
204 Sns,
205 None,
206}
207
208impl FromStr for QueueDriver {
209 type Err = String;
210 fn from_str(s: &str) -> Result<Self, Self::Err> {
211 match s.to_lowercase().as_str() {
212 "memory" => Ok(QueueDriver::Memory),
213 "redis" => Ok(QueueDriver::Redis),
214 "redis-cluster" => Ok(QueueDriver::RedisCluster),
215 "sqs" => Ok(QueueDriver::Sqs),
216 "sns" => Ok(QueueDriver::Sns),
217 "none" => Ok(QueueDriver::None),
218 _ => Err(format!("Unknown queue driver: {s}")),
219 }
220 }
221}
222
223impl AsRef<str> for QueueDriver {
224 fn as_ref(&self) -> &str {
225 match self {
226 QueueDriver::Memory => "memory",
227 QueueDriver::Redis => "redis",
228 QueueDriver::RedisCluster => "redis-cluster",
229 QueueDriver::Sqs => "sqs",
230 QueueDriver::Sns => "sns",
231 QueueDriver::None => "none",
232 }
233 }
234}
235
236#[derive(Debug, Clone, Serialize, Deserialize)]
237#[serde(default)]
238pub struct RedisClusterQueueConfig {
239 pub concurrency: u32,
240 pub prefix: Option<String>,
241 pub nodes: Vec<String>,
242 pub request_timeout_ms: u64,
243}
244
245impl Default for RedisClusterQueueConfig {
246 fn default() -> Self {
247 Self {
248 concurrency: 5,
249 prefix: Some("sockudo_queue:".to_string()),
250 nodes: vec!["redis://127.0.0.1:6379".to_string()],
251 request_timeout_ms: 5000,
252 }
253 }
254}
255
256#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
257#[serde(rename_all = "lowercase")]
258pub enum MetricsDriver {
259 #[default]
260 Prometheus,
261}
262
263#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
264#[serde(rename_all = "lowercase")]
265pub enum LogOutputFormat {
266 #[default]
267 Human,
268 Json,
269}
270
271impl FromStr for LogOutputFormat {
272 type Err = String;
273 fn from_str(s: &str) -> Result<Self, Self::Err> {
274 match s.to_lowercase().as_str() {
275 "human" => Ok(LogOutputFormat::Human),
276 "json" => Ok(LogOutputFormat::Json),
277 _ => Err(format!("Unknown log output format: {s}")),
278 }
279 }
280}
281
282impl FromStr for MetricsDriver {
283 type Err = String;
284 fn from_str(s: &str) -> Result<Self, Self::Err> {
285 match s.to_lowercase().as_str() {
286 "prometheus" => Ok(MetricsDriver::Prometheus),
287 _ => Err(format!("Unknown metrics driver: {s}")),
288 }
289 }
290}
291
292impl AsRef<str> for MetricsDriver {
293 fn as_ref(&self) -> &str {
294 match self {
295 MetricsDriver::Prometheus => "prometheus",
296 }
297 }
298}
299
300#[derive(Debug, Clone, Serialize, Deserialize)]
302#[serde(default)]
303pub struct ServerOptions {
304 pub adapter: AdapterConfig,
305 pub app_manager: AppManagerConfig,
306 pub cache: CacheConfig,
307 pub channel_limits: ChannelLimits,
308 pub cors: CorsConfig,
309 pub database: DatabaseConfig,
310 pub database_pooling: DatabasePooling,
311 pub debug: bool,
312 pub event_limits: EventLimits,
313 pub host: String,
314 pub http_api: HttpApiConfig,
315 pub instance: InstanceConfig,
316 pub logging: Option<LoggingConfig>,
317 pub metrics: MetricsConfig,
318 pub mode: String,
319 pub port: u16,
320 pub path_prefix: String,
321 pub presence: PresenceConfig,
322 pub queue: QueueConfig,
323 pub rate_limiter: RateLimiterConfig,
324 pub shutdown_grace_period: u64,
325 pub ssl: SslConfig,
326 pub user_authentication_timeout: u64,
327 pub webhooks: WebhooksConfig,
328 pub websocket_max_payload_kb: u32,
329 pub cleanup: CleanupConfig,
330 pub activity_timeout: u64,
331 pub cluster_health: ClusterHealthConfig,
332 pub unix_socket: UnixSocketConfig,
333 pub delta_compression: DeltaCompressionOptionsConfig,
334 pub tag_filtering: TagFilteringConfig,
335 pub websocket: WebSocketConfig,
336}
337
338#[derive(Debug, Clone, Serialize, Deserialize)]
341#[serde(default)]
342pub struct SqsQueueConfig {
343 pub region: String,
344 pub queue_url_prefix: Option<String>,
345 pub visibility_timeout: i32,
346 pub endpoint_url: Option<String>,
347 pub max_messages: i32,
348 pub wait_time_seconds: i32,
349 pub concurrency: u32,
350 pub fifo: bool,
351 pub message_group_id: Option<String>,
352}
353
354#[derive(Debug, Clone, Serialize, Deserialize)]
355#[serde(default)]
356pub struct SnsQueueConfig {
357 pub region: String,
358 pub topic_arn: String,
359 pub endpoint_url: Option<String>,
360}
361
362#[derive(Debug, Clone, Serialize, Deserialize)]
363#[serde(default)]
364pub struct AdapterConfig {
365 pub driver: AdapterDriver,
366 pub redis: RedisAdapterConfig,
367 pub cluster: RedisClusterAdapterConfig,
368 pub nats: NatsAdapterConfig,
369 #[serde(default = "default_buffer_multiplier_per_cpu")]
370 pub buffer_multiplier_per_cpu: usize,
371 pub cluster_health: ClusterHealthConfig,
372 #[serde(default = "default_enable_socket_counting")]
373 pub enable_socket_counting: bool,
374}
375
376fn default_enable_socket_counting() -> bool {
377 true
378}
379
380fn default_buffer_multiplier_per_cpu() -> usize {
381 64
382}
383
384impl Default for AdapterConfig {
385 fn default() -> Self {
386 Self {
387 driver: AdapterDriver::default(),
388 redis: RedisAdapterConfig::default(),
389 cluster: RedisClusterAdapterConfig::default(),
390 nats: NatsAdapterConfig::default(),
391 buffer_multiplier_per_cpu: default_buffer_multiplier_per_cpu(),
392 cluster_health: ClusterHealthConfig::default(),
393 enable_socket_counting: default_enable_socket_counting(),
394 }
395 }
396}
397
398#[derive(Debug, Clone, Serialize, Deserialize)]
399#[serde(default)]
400pub struct RedisAdapterConfig {
401 pub requests_timeout: u64,
402 pub prefix: String,
403 pub redis_pub_options: AHashMap<String, sonic_rs::Value>,
404 pub redis_sub_options: AHashMap<String, sonic_rs::Value>,
405 pub cluster_mode: bool,
406}
407
408#[derive(Debug, Clone, Serialize, Deserialize)]
409#[serde(default)]
410pub struct RedisClusterAdapterConfig {
411 pub nodes: Vec<String>,
412 pub prefix: String,
413 pub request_timeout_ms: u64,
414 pub use_connection_manager: bool,
415 #[serde(default)]
416 pub use_sharded_pubsub: bool,
417}
418
419#[derive(Debug, Clone, Serialize, Deserialize)]
420#[serde(default)]
421pub struct NatsAdapterConfig {
422 pub servers: Vec<String>,
423 pub prefix: String,
424 pub request_timeout_ms: u64,
425 pub username: Option<String>,
426 pub password: Option<String>,
427 pub token: Option<String>,
428 pub connection_timeout_ms: u64,
429 pub nodes_number: Option<u32>,
430}
431
432#[derive(Debug, Clone, Serialize, Deserialize, Default)]
433#[serde(default)]
434pub struct AppManagerConfig {
435 pub driver: AppManagerDriver,
436 pub array: ArrayConfig,
437 pub cache: CacheSettings,
438 pub scylladb: ScyllaDbSettings,
439}
440
441#[derive(Debug, Clone, Serialize, Deserialize, Default)]
442#[serde(default)]
443pub struct ArrayConfig {
444 pub apps: Vec<App>,
445}
446
447#[derive(Debug, Clone, Serialize, Deserialize)]
448#[serde(default)]
449pub struct CacheSettings {
450 pub enabled: bool,
451 pub ttl: u64,
452}
453
454#[derive(Debug, Clone, Serialize, Deserialize)]
455#[serde(default)]
456pub struct MemoryCacheOptions {
457 pub ttl: u64,
458 pub cleanup_interval: u64,
459 pub max_capacity: u64,
460}
461
462#[derive(Debug, Clone, Serialize, Deserialize)]
463#[serde(default)]
464pub struct CacheConfig {
465 pub driver: CacheDriver,
466 pub redis: RedisConfig,
467 pub memory: MemoryCacheOptions,
468}
469
470#[derive(Debug, Clone, Serialize, Deserialize, Default)]
471#[serde(default)]
472pub struct RedisConfig {
473 pub prefix: Option<String>,
474 pub url_override: Option<String>,
475 pub cluster_mode: bool,
476}
477
478#[derive(Debug, Clone, Serialize, Deserialize)]
479#[serde(default)]
480pub struct ChannelLimits {
481 pub max_name_length: u32,
482 pub cache_ttl: u64,
483}
484
485#[derive(Debug, Clone, Serialize, Deserialize)]
486#[serde(default)]
487pub struct CorsConfig {
488 pub credentials: bool,
489 pub origin: Vec<String>,
490 pub methods: Vec<String>,
491 pub allowed_headers: Vec<String>,
492}
493
494#[derive(Debug, Clone, Serialize, Deserialize, Default)]
495#[serde(default)]
496pub struct DatabaseConfig {
497 pub mysql: DatabaseConnection,
498 pub postgres: DatabaseConnection,
499 pub redis: RedisConnection,
500 pub dynamodb: DynamoDbSettings,
501 pub scylladb: ScyllaDbSettings,
502}
503
504#[derive(Debug, Clone, Serialize, Deserialize)]
505#[serde(default)]
506pub struct DatabaseConnection {
507 pub host: String,
508 pub port: u16,
509 pub username: String,
510 pub password: String,
511 pub database: String,
512 pub table_name: String,
513 pub connection_pool_size: u32,
514 pub pool_min: Option<u32>,
515 pub pool_max: Option<u32>,
516 pub cache_ttl: u64,
517 pub cache_cleanup_interval: u64,
518 pub cache_max_capacity: u64,
519}
520
521#[derive(Debug, Clone, Serialize, Deserialize)]
522#[serde(default)]
523pub struct RedisConnection {
524 pub host: String,
525 pub port: u16,
526 pub db: u32,
527 pub username: Option<String>,
528 pub password: Option<String>,
529 pub key_prefix: String,
530 pub sentinels: Vec<RedisSentinel>,
531 pub sentinel_password: Option<String>,
532 pub name: String,
533 pub cluster: RedisClusterConnection,
534 pub cluster_nodes: Vec<ClusterNode>,
536}
537
538#[derive(Debug, Clone, Serialize, Deserialize)]
539#[serde(default)]
540pub struct RedisSentinel {
541 pub host: String,
542 pub port: u16,
543}
544
545#[derive(Debug, Clone, Serialize, Deserialize, Default)]
546#[serde(default)]
547pub struct RedisClusterConnection {
548 pub nodes: Vec<ClusterNode>,
549 pub username: Option<String>,
550 pub password: Option<String>,
551 #[serde(alias = "useTLS")]
552 pub use_tls: bool,
553}
554
555impl RedisConnection {
556 pub fn is_sentinel_configured(&self) -> bool {
558 !self.sentinels.is_empty()
559 }
560
561 pub fn to_url(&self) -> String {
563 if self.is_sentinel_configured() {
564 self.build_sentinel_url()
565 } else {
566 self.build_standard_url()
567 }
568 }
569
570 fn build_standard_url(&self) -> String {
571 let mut url = String::from("redis://");
572
573 if let Some(ref username) = self.username {
574 url.push_str(username);
575 if let Some(ref password) = self.password {
576 url.push(':');
577 url.push_str(&urlencoding::encode(password));
578 }
579 url.push('@');
580 } else if let Some(ref password) = self.password {
581 url.push(':');
582 url.push_str(&urlencoding::encode(password));
583 url.push('@');
584 }
585
586 url.push_str(&self.host);
587 url.push(':');
588 url.push_str(&self.port.to_string());
589 url.push('/');
590 url.push_str(&self.db.to_string());
591
592 url
593 }
594
595 fn build_sentinel_url(&self) -> String {
596 let mut url = String::from("redis+sentinel://");
597
598 if let Some(ref sentinel_password) = self.sentinel_password {
599 url.push(':');
600 url.push_str(&urlencoding::encode(sentinel_password));
601 url.push('@');
602 }
603
604 let sentinel_hosts: Vec<String> = self
605 .sentinels
606 .iter()
607 .map(|s| format!("{}:{}", s.host, s.port))
608 .collect();
609 url.push_str(&sentinel_hosts.join(","));
610
611 url.push('/');
612 url.push_str(&self.name);
613 url.push('/');
614 url.push_str(&self.db.to_string());
615
616 let mut params = Vec::new();
617 if let Some(ref password) = self.password {
618 params.push(format!("password={}", urlencoding::encode(password)));
619 }
620 if let Some(ref username) = self.username {
621 params.push(format!("username={}", urlencoding::encode(username)));
622 }
623
624 if !params.is_empty() {
625 url.push('?');
626 url.push_str(¶ms.join("&"));
627 }
628
629 url
630 }
631
632 pub fn has_cluster_nodes(&self) -> bool {
635 !self.cluster.nodes.is_empty() || !self.cluster_nodes.is_empty()
636 }
637
638 pub fn cluster_node_urls(&self) -> Vec<String> {
641 if !self.cluster.nodes.is_empty() {
642 return self.build_cluster_urls(&self.cluster.nodes);
643 }
644 self.build_cluster_urls(&self.cluster_nodes)
645 }
646
647 pub fn normalize_cluster_seed_urls(&self, seeds: &[String]) -> Vec<String> {
650 self.build_cluster_urls(
651 &seeds
652 .iter()
653 .filter_map(|seed| ClusterNode::from_seed(seed))
654 .collect::<Vec<ClusterNode>>(),
655 )
656 }
657
658 fn build_cluster_urls(&self, nodes: &[ClusterNode]) -> Vec<String> {
659 let username = self
660 .cluster
661 .username
662 .as_deref()
663 .or(self.username.as_deref());
664 let password = self
665 .cluster
666 .password
667 .as_deref()
668 .or(self.password.as_deref());
669 let use_tls = self.cluster.use_tls;
670
671 nodes
672 .iter()
673 .map(|node| node.to_url_with_options(use_tls, username, password))
674 .collect()
675 }
676}
677
678impl RedisSentinel {
679 pub fn to_host_port(&self) -> String {
680 format!("{}:{}", self.host, self.port)
681 }
682}
683
684#[derive(Debug, Clone, Serialize, Deserialize)]
685#[serde(default)]
686pub struct ClusterNode {
687 pub host: String,
688 pub port: u16,
689}
690
691impl ClusterNode {
692 pub fn to_url(&self) -> String {
693 self.to_url_with_options(false, None, None)
694 }
695
696 pub fn to_url_with_options(
697 &self,
698 use_tls: bool,
699 username: Option<&str>,
700 password: Option<&str>,
701 ) -> String {
702 let host = self.host.trim();
703
704 if host.starts_with("redis://") || host.starts_with("rediss://") {
705 if let Ok(parsed) = Url::parse(host)
706 && let Some(host_str) = parsed.host_str()
707 {
708 let scheme = parsed.scheme();
709 let port = parsed.port_or_known_default().unwrap_or(self.port);
710 let parsed_username = (!parsed.username().is_empty()).then_some(parsed.username());
711 let parsed_password = parsed.password();
712 let has_embedded_auth = parsed_username.is_some() || parsed_password.is_some();
713 let (effective_username, effective_password) = if has_embedded_auth {
714 (parsed_username, parsed_password)
715 } else {
716 (username, password)
717 };
718
719 return build_redis_url(
720 scheme,
721 host_str,
722 port,
723 effective_username,
724 effective_password,
725 );
726 }
727
728 let has_port = if let Some(bracket_pos) = host.rfind(']') {
730 host[bracket_pos..].contains(':')
731 } else {
732 host.split(':').count() >= 3
733 };
734 let base = if has_port {
735 host.to_string()
736 } else {
737 format!("{}:{}", host, self.port)
738 };
739
740 if let Ok(parsed) = Url::parse(&base) {
741 let parsed_username = (!parsed.username().is_empty()).then_some(parsed.username());
742 let parsed_password = parsed.password();
743 if let Some(host_str) = parsed.host_str() {
744 let port = parsed.port_or_known_default().unwrap_or(self.port);
745 let has_embedded_auth = parsed_username.is_some() || parsed_password.is_some();
746 let (effective_username, effective_password) = if has_embedded_auth {
747 (parsed_username, parsed_password)
748 } else {
749 (username, password)
750 };
751 return build_redis_url(
752 parsed.scheme(),
753 host_str,
754 port,
755 effective_username,
756 effective_password,
757 );
758 }
759 }
760 return base;
761 }
762
763 let (normalized_host, normalized_port) = split_plain_host_and_port(host, self.port);
764 let scheme = if use_tls { "rediss" } else { "redis" };
765 build_redis_url(
766 scheme,
767 &normalized_host,
768 normalized_port,
769 username,
770 password,
771 )
772 }
773
774 pub fn from_seed(seed: &str) -> Option<Self> {
775 let trimmed = seed.trim();
776 if trimmed.is_empty() {
777 return None;
778 }
779
780 if trimmed.starts_with("redis://") || trimmed.starts_with("rediss://") {
781 let port = Url::parse(trimmed)
782 .ok()
783 .and_then(|parsed| parsed.port_or_known_default())
784 .unwrap_or(6379);
785 return Some(Self {
786 host: trimmed.to_string(),
787 port,
788 });
789 }
790
791 let (host, port) = split_plain_host_and_port(trimmed, 6379);
792 Some(Self { host, port })
793 }
794}
795
796fn split_plain_host_and_port(raw_host: &str, default_port: u16) -> (String, u16) {
797 let host = raw_host.trim();
798
799 if host.starts_with('[') {
801 if let Some(end_bracket) = host.find(']') {
802 let host_part = host[1..end_bracket].to_string();
803 let remainder = &host[end_bracket + 1..];
804 if let Some(port_str) = remainder.strip_prefix(':')
805 && let Ok(port) = port_str.parse::<u16>()
806 {
807 return (host_part, port);
808 }
809 return (host_part, default_port);
810 }
811 return (host.to_string(), default_port);
812 }
813
814 if host.matches(':').count() == 1
816 && let Some((host_part, port_part)) = host.rsplit_once(':')
817 && let Ok(port) = port_part.parse::<u16>()
818 {
819 return (host_part.to_string(), port);
820 }
821
822 (host.to_string(), default_port)
823}
824
825fn build_redis_url(
826 scheme: &str,
827 host: &str,
828 port: u16,
829 username: Option<&str>,
830 password: Option<&str>,
831) -> String {
832 let mut url = format!("{scheme}://");
833
834 if let Some(user) = username {
835 url.push_str(&urlencoding::encode(user));
836 if let Some(pass) = password {
837 url.push(':');
838 url.push_str(&urlencoding::encode(pass));
839 }
840 url.push('@');
841 } else if let Some(pass) = password {
842 url.push(':');
843 url.push_str(&urlencoding::encode(pass));
844 url.push('@');
845 }
846
847 if host.contains(':') && !host.starts_with('[') {
848 url.push('[');
849 url.push_str(host);
850 url.push(']');
851 } else {
852 url.push_str(host);
853 }
854 url.push(':');
855 url.push_str(&port.to_string());
856 url
857}
858
859#[derive(Debug, Clone, Serialize, Deserialize)]
860#[serde(default)]
861pub struct DatabasePooling {
862 pub enabled: bool,
863 pub min: u32,
864 pub max: u32,
865}
866
867#[derive(Debug, Clone, Serialize, Deserialize)]
868#[serde(default)]
869pub struct EventLimits {
870 pub max_channels_at_once: u32,
871 pub max_name_length: u32,
872 pub max_payload_in_kb: u32,
873 pub max_batch_size: u32,
874}
875
876#[derive(Debug, Clone, Serialize, Deserialize)]
877#[serde(default)]
878pub struct HttpApiConfig {
879 pub request_limit_in_mb: u32,
880 pub accept_traffic: AcceptTraffic,
881}
882
883#[derive(Debug, Clone, Serialize, Deserialize)]
884#[serde(default)]
885pub struct AcceptTraffic {
886 pub memory_threshold: f64,
887}
888
889#[derive(Debug, Clone, Serialize, Deserialize)]
890#[serde(default)]
891pub struct InstanceConfig {
892 pub process_id: String,
893}
894
895#[derive(Debug, Clone, Serialize, Deserialize)]
896#[serde(default)]
897pub struct MetricsConfig {
898 pub enabled: bool,
899 pub driver: MetricsDriver,
900 pub host: String,
901 pub prometheus: PrometheusConfig,
902 pub port: u16,
903}
904
905#[derive(Debug, Clone, Serialize, Deserialize)]
906#[serde(default)]
907pub struct PrometheusConfig {
908 pub prefix: String,
909}
910
911#[derive(Debug, Clone, Serialize, Deserialize)]
912#[serde(default)]
913pub struct LoggingConfig {
914 pub colors_enabled: bool,
915 pub include_target: bool,
916}
917
918#[derive(Debug, Clone, Serialize, Deserialize)]
919#[serde(default)]
920pub struct PresenceConfig {
921 pub max_members_per_channel: u32,
922 pub max_member_size_in_kb: u32,
923}
924
925#[derive(Debug, Clone, Serialize, Deserialize)]
928#[serde(default)]
929pub struct WebSocketConfig {
930 pub max_messages: Option<usize>,
931 pub max_bytes: Option<usize>,
932 pub disconnect_on_buffer_full: bool,
933 pub max_message_size: usize,
934 pub max_frame_size: usize,
935 pub write_buffer_size: usize,
936 pub max_backpressure: usize,
937 pub auto_ping: bool,
938 pub ping_interval: u32,
939 pub idle_timeout: u32,
940 pub compression: String,
941}
942
943impl Default for WebSocketConfig {
944 fn default() -> Self {
945 Self {
946 max_messages: Some(1000),
947 max_bytes: None,
948 disconnect_on_buffer_full: true,
949 max_message_size: 64 * 1024 * 1024,
950 max_frame_size: 16 * 1024 * 1024,
951 write_buffer_size: 16 * 1024,
952 max_backpressure: 1024 * 1024,
953 auto_ping: true,
954 ping_interval: 30,
955 idle_timeout: 120,
956 compression: "disabled".to_string(),
957 }
958 }
959}
960
961impl WebSocketConfig {
962 pub fn to_buffer_config(&self) -> crate::websocket::WebSocketBufferConfig {
964 use crate::websocket::{BufferLimit, WebSocketBufferConfig};
965
966 let limit = match (self.max_messages, self.max_bytes) {
967 (Some(messages), Some(bytes)) => BufferLimit::Both { messages, bytes },
968 (Some(messages), None) => BufferLimit::Messages(messages),
969 (None, Some(bytes)) => BufferLimit::Bytes(bytes),
970 (None, None) => BufferLimit::Messages(1000),
971 };
972
973 WebSocketBufferConfig {
974 limit,
975 disconnect_on_full: self.disconnect_on_buffer_full,
976 }
977 }
978
979 pub fn to_sockudo_ws_config(
981 &self,
982 websocket_max_payload_kb: u32,
983 activity_timeout: u64,
984 ) -> sockudo_ws::Config {
985 use sockudo_ws::Compression;
986
987 let compression = match self.compression.to_lowercase().as_str() {
988 "dedicated" => Compression::Dedicated,
989 "shared" => Compression::Shared,
990 "window256b" => Compression::Window256B,
991 "window1kb" => Compression::Window1KB,
992 "window2kb" => Compression::Window2KB,
993 "window4kb" => Compression::Window4KB,
994 "window8kb" => Compression::Window8KB,
995 "window16kb" => Compression::Window16KB,
996 "window32kb" => Compression::Window32KB,
997 _ => Compression::Disabled,
998 };
999
1000 sockudo_ws::Config::builder()
1001 .max_payload_length(
1002 self.max_bytes
1003 .unwrap_or(websocket_max_payload_kb as usize * 1024),
1004 )
1005 .max_message_size(self.max_message_size)
1006 .max_frame_size(self.max_frame_size)
1007 .write_buffer_size(self.write_buffer_size)
1008 .max_backpressure(self.max_backpressure)
1009 .idle_timeout(self.idle_timeout)
1010 .auto_ping(self.auto_ping)
1011 .ping_interval(self.ping_interval.max((activity_timeout / 2).max(5) as u32))
1012 .compression(compression)
1013 .build()
1014 }
1015}
1016
1017#[derive(Debug, Clone, Serialize, Deserialize, Default)]
1018#[serde(default)]
1019pub struct QueueConfig {
1020 pub driver: QueueDriver,
1021 pub redis: RedisQueueConfig,
1022 pub redis_cluster: RedisClusterQueueConfig,
1023 pub sqs: SqsQueueConfig,
1024 pub sns: SnsQueueConfig,
1025}
1026
1027#[derive(Debug, Clone, Serialize, Deserialize)]
1028#[serde(default)]
1029pub struct RedisQueueConfig {
1030 pub concurrency: u32,
1031 pub prefix: Option<String>,
1032 pub url_override: Option<String>,
1033 pub cluster_mode: bool,
1034}
1035
1036#[derive(Clone, Debug, Serialize, Deserialize)]
1037#[serde(default)]
1038pub struct RateLimit {
1039 pub max_requests: u32,
1040 pub window_seconds: u64,
1041 pub identifier: Option<String>,
1042 pub trust_hops: Option<u32>,
1043}
1044
1045#[derive(Debug, Clone, Serialize, Deserialize)]
1046#[serde(default)]
1047pub struct RateLimiterConfig {
1048 pub enabled: bool,
1049 pub driver: CacheDriver,
1050 pub api_rate_limit: RateLimit,
1051 pub websocket_rate_limit: RateLimit,
1052 pub redis: RedisConfig,
1053}
1054
1055#[derive(Debug, Clone, Serialize, Deserialize)]
1056#[serde(default)]
1057pub struct SslConfig {
1058 pub enabled: bool,
1059 pub cert_path: String,
1060 pub key_path: String,
1061 pub passphrase: Option<String>,
1062 pub ca_path: Option<String>,
1063 pub redirect_http: bool,
1064 pub http_port: Option<u16>,
1065}
1066
1067#[derive(Debug, Clone, Serialize, Deserialize, Default)]
1068#[serde(default)]
1069pub struct WebhooksConfig {
1070 pub batching: BatchingConfig,
1071}
1072
1073#[derive(Debug, Clone, Serialize, Deserialize)]
1074#[serde(default)]
1075pub struct BatchingConfig {
1076 pub enabled: bool,
1077 pub duration: u64,
1078 pub size: usize,
1079}
1080
1081#[derive(Debug, Clone, Serialize, Deserialize)]
1082#[serde(default)]
1083pub struct ClusterHealthConfig {
1084 pub enabled: bool,
1085 pub heartbeat_interval_ms: u64,
1086 pub node_timeout_ms: u64,
1087 pub cleanup_interval_ms: u64,
1088}
1089
1090#[derive(Debug, Clone, Serialize, Deserialize)]
1091#[serde(default)]
1092pub struct UnixSocketConfig {
1093 pub enabled: bool,
1094 pub path: String,
1095 #[serde(deserialize_with = "deserialize_octal_permission")]
1096 pub permission_mode: u32,
1097}
1098
1099#[derive(Debug, Clone, Serialize, Deserialize)]
1100#[serde(default)]
1101pub struct DeltaCompressionOptionsConfig {
1102 pub enabled: bool,
1103 pub algorithm: String,
1104 pub full_message_interval: u32,
1105 pub min_message_size: usize,
1106 pub max_state_age_secs: u64,
1107 pub max_channel_states_per_socket: usize,
1108 pub max_conflation_states_per_channel: Option<usize>,
1109 pub conflation_key_path: Option<String>,
1110 pub cluster_coordination: bool,
1111 pub omit_delta_algorithm: bool,
1112}
1113
1114#[derive(Debug, Clone, Serialize, Deserialize)]
1116#[serde(default)]
1117pub struct CleanupConfig {
1118 pub queue_buffer_size: usize,
1119 pub batch_size: usize,
1120 pub batch_timeout_ms: u64,
1121 pub worker_threads: WorkerThreadsConfig,
1122 pub max_retry_attempts: u32,
1123 pub async_enabled: bool,
1124 pub fallback_to_sync: bool,
1125}
1126
1127#[derive(Debug, Clone)]
1129pub enum WorkerThreadsConfig {
1130 Auto,
1131 Fixed(usize),
1132}
1133
1134impl serde::Serialize for WorkerThreadsConfig {
1135 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
1136 where
1137 S: serde::Serializer,
1138 {
1139 match self {
1140 WorkerThreadsConfig::Auto => serializer.serialize_str("auto"),
1141 WorkerThreadsConfig::Fixed(n) => serializer.serialize_u64(*n as u64),
1142 }
1143 }
1144}
1145
1146impl<'de> serde::Deserialize<'de> for WorkerThreadsConfig {
1147 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
1148 where
1149 D: serde::Deserializer<'de>,
1150 {
1151 use serde::de;
1152 struct WorkerThreadsVisitor;
1153 impl<'de> de::Visitor<'de> for WorkerThreadsVisitor {
1154 type Value = WorkerThreadsConfig;
1155
1156 fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
1157 formatter.write_str(r#""auto" or a positive integer"#)
1158 }
1159
1160 fn visit_str<E: de::Error>(self, value: &str) -> Result<Self::Value, E> {
1161 if value.eq_ignore_ascii_case("auto") {
1162 Ok(WorkerThreadsConfig::Auto)
1163 } else if let Ok(n) = value.parse::<usize>() {
1164 Ok(WorkerThreadsConfig::Fixed(n))
1165 } else {
1166 Err(E::custom(format!(
1167 "expected 'auto' or a number, got '{value}'"
1168 )))
1169 }
1170 }
1171
1172 fn visit_u64<E: de::Error>(self, value: u64) -> Result<Self::Value, E> {
1173 Ok(WorkerThreadsConfig::Fixed(value as usize))
1174 }
1175
1176 fn visit_i64<E: de::Error>(self, value: i64) -> Result<Self::Value, E> {
1177 if value >= 0 {
1178 Ok(WorkerThreadsConfig::Fixed(value as usize))
1179 } else {
1180 Err(E::custom("worker_threads must be non-negative"))
1181 }
1182 }
1183 }
1184 deserializer.deserialize_any(WorkerThreadsVisitor)
1185 }
1186}
1187
1188impl Default for CleanupConfig {
1189 fn default() -> Self {
1190 Self {
1191 queue_buffer_size: 1024,
1192 batch_size: 64,
1193 batch_timeout_ms: 100,
1194 worker_threads: WorkerThreadsConfig::Auto,
1195 max_retry_attempts: 3,
1196 async_enabled: true,
1197 fallback_to_sync: true,
1198 }
1199 }
1200}
1201
1202impl CleanupConfig {
1203 pub fn validate(&self) -> Result<(), String> {
1204 if self.queue_buffer_size == 0 {
1205 return Err("queue_buffer_size must be greater than 0".to_string());
1206 }
1207 if self.batch_size == 0 {
1208 return Err("batch_size must be greater than 0".to_string());
1209 }
1210 if self.batch_timeout_ms == 0 {
1211 return Err("batch_timeout_ms must be greater than 0".to_string());
1212 }
1213 if let WorkerThreadsConfig::Fixed(n) = self.worker_threads
1214 && n == 0
1215 {
1216 return Err("worker_threads must be greater than 0 when using fixed count".to_string());
1217 }
1218 Ok(())
1219 }
1220}
1221
1222#[derive(Debug, Clone, Serialize, Deserialize, Default)]
1223#[serde(default)]
1224pub struct TagFilteringConfig {
1225 #[serde(default)]
1226 pub enabled: bool,
1227 #[serde(default = "default_true")]
1228 pub enable_tags: bool,
1229}
1230
1231fn default_true() -> bool {
1232 true
1233}
1234
1235impl Default for ServerOptions {
1238 fn default() -> Self {
1239 Self {
1240 adapter: AdapterConfig::default(),
1241 app_manager: AppManagerConfig::default(),
1242 cache: CacheConfig::default(),
1243 channel_limits: ChannelLimits::default(),
1244 cors: CorsConfig::default(),
1245 database: DatabaseConfig::default(),
1246 database_pooling: DatabasePooling::default(),
1247 debug: false,
1248 tag_filtering: TagFilteringConfig::default(),
1249 event_limits: EventLimits::default(),
1250 host: "0.0.0.0".to_string(),
1251 http_api: HttpApiConfig::default(),
1252 instance: InstanceConfig::default(),
1253 logging: None,
1254 metrics: MetricsConfig::default(),
1255 mode: "production".to_string(),
1256 port: 6001,
1257 path_prefix: "/".to_string(),
1258 presence: PresenceConfig::default(),
1259 queue: QueueConfig::default(),
1260 rate_limiter: RateLimiterConfig::default(),
1261 shutdown_grace_period: 10,
1262 ssl: SslConfig::default(),
1263 user_authentication_timeout: 3600,
1264 webhooks: WebhooksConfig::default(),
1265 websocket_max_payload_kb: 64,
1266 cleanup: CleanupConfig::default(),
1267 activity_timeout: 120,
1268 cluster_health: ClusterHealthConfig::default(),
1269 unix_socket: UnixSocketConfig::default(),
1270 delta_compression: DeltaCompressionOptionsConfig::default(),
1271 websocket: WebSocketConfig::default(),
1272 }
1273 }
1274}
1275
1276impl Default for SqsQueueConfig {
1277 fn default() -> Self {
1278 Self {
1279 region: "us-east-1".to_string(),
1280 queue_url_prefix: None,
1281 visibility_timeout: 30,
1282 endpoint_url: None,
1283 max_messages: 10,
1284 wait_time_seconds: 5,
1285 concurrency: 5,
1286 fifo: false,
1287 message_group_id: Some("default".to_string()),
1288 }
1289 }
1290}
1291
1292impl Default for SnsQueueConfig {
1293 fn default() -> Self {
1294 Self {
1295 region: "us-east-1".to_string(),
1296 topic_arn: String::new(),
1297 endpoint_url: None,
1298 }
1299 }
1300}
1301
1302impl Default for RedisAdapterConfig {
1303 fn default() -> Self {
1304 Self {
1305 requests_timeout: 5000,
1306 prefix: "sockudo_adapter:".to_string(),
1307 redis_pub_options: AHashMap::new(),
1308 redis_sub_options: AHashMap::new(),
1309 cluster_mode: false,
1310 }
1311 }
1312}
1313
1314impl Default for RedisClusterAdapterConfig {
1315 fn default() -> Self {
1316 Self {
1317 nodes: vec![],
1318 prefix: "sockudo_adapter:".to_string(),
1319 request_timeout_ms: 1000,
1320 use_connection_manager: true,
1321 use_sharded_pubsub: false,
1322 }
1323 }
1324}
1325
1326impl Default for NatsAdapterConfig {
1327 fn default() -> Self {
1328 Self {
1329 servers: vec!["nats://localhost:4222".to_string()],
1330 prefix: "sockudo_adapter:".to_string(),
1331 request_timeout_ms: 5000,
1332 username: None,
1333 password: None,
1334 token: None,
1335 connection_timeout_ms: 5000,
1336 nodes_number: None,
1337 }
1338 }
1339}
1340
1341impl Default for CacheSettings {
1342 fn default() -> Self {
1343 Self {
1344 enabled: true,
1345 ttl: 300,
1346 }
1347 }
1348}
1349
1350impl Default for MemoryCacheOptions {
1351 fn default() -> Self {
1352 Self {
1353 ttl: 300,
1354 cleanup_interval: 60,
1355 max_capacity: 10000,
1356 }
1357 }
1358}
1359
1360impl Default for CacheConfig {
1361 fn default() -> Self {
1362 Self {
1363 driver: CacheDriver::default(),
1364 redis: RedisConfig {
1365 prefix: Some("sockudo_cache:".to_string()),
1366 url_override: None,
1367 cluster_mode: false,
1368 },
1369 memory: MemoryCacheOptions::default(),
1370 }
1371 }
1372}
1373
1374impl Default for ChannelLimits {
1375 fn default() -> Self {
1376 Self {
1377 max_name_length: 200,
1378 cache_ttl: 3600,
1379 }
1380 }
1381}
1382impl Default for CorsConfig {
1383 fn default() -> Self {
1384 Self {
1385 credentials: true,
1386 origin: vec!["*".to_string()],
1387 methods: vec!["GET".to_string(), "POST".to_string(), "OPTIONS".to_string()],
1388 allowed_headers: vec![
1389 "Authorization".to_string(),
1390 "Content-Type".to_string(),
1391 "X-Requested-With".to_string(),
1392 "Accept".to_string(),
1393 ],
1394 }
1395 }
1396}
1397
1398impl Default for DatabaseConnection {
1399 fn default() -> Self {
1400 Self {
1401 host: "localhost".to_string(),
1402 port: 3306,
1403 username: "root".to_string(),
1404 password: "".to_string(),
1405 database: "sockudo".to_string(),
1406 table_name: "applications".to_string(),
1407 connection_pool_size: 10,
1408 pool_min: None,
1409 pool_max: None,
1410 cache_ttl: 300,
1411 cache_cleanup_interval: 60,
1412 cache_max_capacity: 100,
1413 }
1414 }
1415}
1416
1417impl Default for RedisConnection {
1418 fn default() -> Self {
1419 Self {
1420 host: "127.0.0.1".to_string(),
1421 port: 6379,
1422 db: 0,
1423 username: None,
1424 password: None,
1425 key_prefix: "sockudo:".to_string(),
1426 sentinels: Vec::new(),
1427 sentinel_password: None,
1428 name: "mymaster".to_string(),
1429 cluster: RedisClusterConnection::default(),
1430 cluster_nodes: Vec::new(),
1431 }
1432 }
1433}
1434
1435impl Default for RedisSentinel {
1436 fn default() -> Self {
1437 Self {
1438 host: "localhost".to_string(),
1439 port: 26379,
1440 }
1441 }
1442}
1443
1444impl Default for ClusterNode {
1445 fn default() -> Self {
1446 Self {
1447 host: "127.0.0.1".to_string(),
1448 port: 7000,
1449 }
1450 }
1451}
1452
1453impl Default for DatabasePooling {
1454 fn default() -> Self {
1455 Self {
1456 enabled: true,
1457 min: 2,
1458 max: 10,
1459 }
1460 }
1461}
1462
1463impl Default for EventLimits {
1464 fn default() -> Self {
1465 Self {
1466 max_channels_at_once: 100,
1467 max_name_length: 200,
1468 max_payload_in_kb: 100,
1469 max_batch_size: 10,
1470 }
1471 }
1472}
1473
1474impl Default for HttpApiConfig {
1475 fn default() -> Self {
1476 Self {
1477 request_limit_in_mb: 10,
1478 accept_traffic: AcceptTraffic::default(),
1479 }
1480 }
1481}
1482
1483impl Default for AcceptTraffic {
1484 fn default() -> Self {
1485 Self {
1486 memory_threshold: 0.90,
1487 }
1488 }
1489}
1490
1491impl Default for InstanceConfig {
1492 fn default() -> Self {
1493 Self {
1494 process_id: uuid::Uuid::new_v4().to_string(),
1495 }
1496 }
1497}
1498
1499impl Default for LoggingConfig {
1500 fn default() -> Self {
1501 Self {
1502 colors_enabled: true,
1503 include_target: true,
1504 }
1505 }
1506}
1507
1508impl Default for MetricsConfig {
1509 fn default() -> Self {
1510 Self {
1511 enabled: true,
1512 driver: MetricsDriver::default(),
1513 host: "0.0.0.0".to_string(),
1514 prometheus: PrometheusConfig::default(),
1515 port: 9601,
1516 }
1517 }
1518}
1519
1520impl Default for PrometheusConfig {
1521 fn default() -> Self {
1522 Self {
1523 prefix: "sockudo_".to_string(),
1524 }
1525 }
1526}
1527
1528impl Default for PresenceConfig {
1529 fn default() -> Self {
1530 Self {
1531 max_members_per_channel: 100,
1532 max_member_size_in_kb: 2,
1533 }
1534 }
1535}
1536
1537impl Default for RedisQueueConfig {
1538 fn default() -> Self {
1539 Self {
1540 concurrency: 5,
1541 prefix: Some("sockudo_queue:".to_string()),
1542 url_override: None,
1543 cluster_mode: false,
1544 }
1545 }
1546}
1547
1548impl Default for RateLimit {
1549 fn default() -> Self {
1550 Self {
1551 max_requests: 60,
1552 window_seconds: 60,
1553 identifier: Some("default".to_string()),
1554 trust_hops: Some(0),
1555 }
1556 }
1557}
1558
1559impl Default for RateLimiterConfig {
1560 fn default() -> Self {
1561 Self {
1562 enabled: true,
1563 driver: CacheDriver::Memory,
1564 api_rate_limit: RateLimit {
1565 max_requests: 100,
1566 window_seconds: 60,
1567 identifier: Some("api".to_string()),
1568 trust_hops: Some(0),
1569 },
1570 websocket_rate_limit: RateLimit {
1571 max_requests: 20,
1572 window_seconds: 60,
1573 identifier: Some("websocket_connect".to_string()),
1574 trust_hops: Some(0),
1575 },
1576 redis: RedisConfig {
1577 prefix: Some("sockudo_rl:".to_string()),
1578 url_override: None,
1579 cluster_mode: false,
1580 },
1581 }
1582 }
1583}
1584
1585impl Default for SslConfig {
1586 fn default() -> Self {
1587 Self {
1588 enabled: false,
1589 cert_path: "".to_string(),
1590 key_path: "".to_string(),
1591 passphrase: None,
1592 ca_path: None,
1593 redirect_http: false,
1594 http_port: Some(80),
1595 }
1596 }
1597}
1598
1599impl Default for BatchingConfig {
1600 fn default() -> Self {
1601 Self {
1602 enabled: true,
1603 duration: 50,
1604 size: 100,
1605 }
1606 }
1607}
1608
1609impl Default for ClusterHealthConfig {
1610 fn default() -> Self {
1611 Self {
1612 enabled: true,
1613 heartbeat_interval_ms: 10000,
1614 node_timeout_ms: 30000,
1615 cleanup_interval_ms: 10000,
1616 }
1617 }
1618}
1619
1620impl Default for UnixSocketConfig {
1621 fn default() -> Self {
1622 Self {
1623 enabled: false,
1624 path: "/var/run/sockudo/sockudo.sock".to_string(),
1625 permission_mode: 0o660,
1626 }
1627 }
1628}
1629
1630impl Default for DeltaCompressionOptionsConfig {
1631 fn default() -> Self {
1632 Self {
1633 enabled: true,
1634 algorithm: "fossil".to_string(),
1635 full_message_interval: 10,
1636 min_message_size: 100,
1637 max_state_age_secs: 300,
1638 max_channel_states_per_socket: 100,
1639 max_conflation_states_per_channel: Some(100),
1640 conflation_key_path: None,
1641 cluster_coordination: false,
1642 omit_delta_algorithm: false,
1643 }
1644 }
1645}
1646
1647impl ClusterHealthConfig {
1648 pub fn validate(&self) -> Result<(), String> {
1649 if self.heartbeat_interval_ms == 0 {
1650 return Err("heartbeat_interval_ms must be greater than 0".to_string());
1651 }
1652 if self.node_timeout_ms == 0 {
1653 return Err("node_timeout_ms must be greater than 0".to_string());
1654 }
1655 if self.cleanup_interval_ms == 0 {
1656 return Err("cleanup_interval_ms must be greater than 0".to_string());
1657 }
1658
1659 if self.heartbeat_interval_ms > self.node_timeout_ms / 3 {
1660 return Err(format!(
1661 "heartbeat_interval_ms ({}) should be at least 3x smaller than node_timeout_ms ({}) to avoid false positive dead node detection. Recommended: heartbeat_interval_ms <= {}",
1662 self.heartbeat_interval_ms,
1663 self.node_timeout_ms,
1664 self.node_timeout_ms / 3
1665 ));
1666 }
1667
1668 if self.cleanup_interval_ms > self.node_timeout_ms {
1669 return Err(format!(
1670 "cleanup_interval_ms ({}) should not be larger than node_timeout_ms ({}) to ensure timely dead node detection",
1671 self.cleanup_interval_ms, self.node_timeout_ms
1672 ));
1673 }
1674
1675 Ok(())
1676 }
1677}
1678
1679impl ServerOptions {
1680 pub async fn load_from_file(path: &str) -> Result<Self, Box<dyn std::error::Error>> {
1681 let content = tokio::fs::read_to_string(path).await?;
1682 let options: Self = sonic_rs::from_str(&content)?;
1683 Ok(options)
1684 }
1685
1686 pub async fn override_from_env(&mut self) -> Result<(), Box<dyn std::error::Error>> {
1687 if let Ok(mode) = std::env::var("ENVIRONMENT") {
1689 self.mode = mode;
1690 }
1691 self.debug = parse_bool_env("DEBUG_MODE", self.debug);
1692 if parse_bool_env("DEBUG", false) {
1693 self.debug = true;
1694 info!("DEBUG environment variable forces debug mode ON");
1695 }
1696
1697 self.activity_timeout = parse_env::<u64>("ACTIVITY_TIMEOUT", self.activity_timeout);
1698
1699 if let Ok(host) = std::env::var("HOST") {
1700 self.host = host;
1701 }
1702 self.port = parse_env::<u16>("PORT", self.port);
1703 self.shutdown_grace_period =
1704 parse_env::<u64>("SHUTDOWN_GRACE_PERIOD", self.shutdown_grace_period);
1705 self.user_authentication_timeout = parse_env::<u64>(
1706 "USER_AUTHENTICATION_TIMEOUT",
1707 self.user_authentication_timeout,
1708 );
1709 self.websocket_max_payload_kb =
1710 parse_env::<u32>("WEBSOCKET_MAX_PAYLOAD_KB", self.websocket_max_payload_kb);
1711 if let Ok(id) = std::env::var("INSTANCE_PROCESS_ID") {
1712 self.instance.process_id = id;
1713 }
1714
1715 if let Ok(driver_str) = std::env::var("ADAPTER_DRIVER") {
1717 self.adapter.driver =
1718 parse_driver_enum(driver_str, self.adapter.driver.clone(), "Adapter");
1719 }
1720 self.adapter.buffer_multiplier_per_cpu = parse_env::<usize>(
1721 "ADAPTER_BUFFER_MULTIPLIER_PER_CPU",
1722 self.adapter.buffer_multiplier_per_cpu,
1723 );
1724 self.adapter.enable_socket_counting = parse_env::<bool>(
1725 "ADAPTER_ENABLE_SOCKET_COUNTING",
1726 self.adapter.enable_socket_counting,
1727 );
1728 if let Ok(driver_str) = std::env::var("CACHE_DRIVER") {
1729 self.cache.driver = parse_driver_enum(driver_str, self.cache.driver.clone(), "Cache");
1730 }
1731 if let Ok(driver_str) = std::env::var("QUEUE_DRIVER") {
1732 self.queue.driver = parse_driver_enum(driver_str, self.queue.driver.clone(), "Queue");
1733 }
1734 if let Ok(driver_str) = std::env::var("APP_MANAGER_DRIVER") {
1735 self.app_manager.driver =
1736 parse_driver_enum(driver_str, self.app_manager.driver.clone(), "AppManager");
1737 }
1738 if let Ok(driver_str) = std::env::var("RATE_LIMITER_DRIVER") {
1739 self.rate_limiter.driver = parse_driver_enum(
1740 driver_str,
1741 self.rate_limiter.driver.clone(),
1742 "RateLimiter Backend",
1743 );
1744 }
1745
1746 if let Ok(host) = std::env::var("DATABASE_REDIS_HOST") {
1748 self.database.redis.host = host;
1749 }
1750 self.database.redis.port =
1751 parse_env::<u16>("DATABASE_REDIS_PORT", self.database.redis.port);
1752 if let Ok(username) = std::env::var("DATABASE_REDIS_USERNAME") {
1753 self.database.redis.username = if username.is_empty() {
1754 None
1755 } else {
1756 Some(username)
1757 };
1758 }
1759 if let Ok(password) = std::env::var("DATABASE_REDIS_PASSWORD") {
1760 self.database.redis.password = Some(password);
1761 }
1762 self.database.redis.db = parse_env::<u32>("DATABASE_REDIS_DB", self.database.redis.db);
1763 if let Ok(prefix) = std::env::var("DATABASE_REDIS_KEY_PREFIX") {
1764 self.database.redis.key_prefix = prefix;
1765 }
1766 if let Ok(cluster_username) = std::env::var("DATABASE_REDIS_CLUSTER_USERNAME") {
1767 self.database.redis.cluster.username = if cluster_username.is_empty() {
1768 None
1769 } else {
1770 Some(cluster_username)
1771 };
1772 }
1773 if let Ok(cluster_password) = std::env::var("DATABASE_REDIS_CLUSTER_PASSWORD") {
1774 self.database.redis.cluster.password = Some(cluster_password);
1775 }
1776 self.database.redis.cluster.use_tls = parse_bool_env(
1777 "DATABASE_REDIS_CLUSTER_USE_TLS",
1778 self.database.redis.cluster.use_tls,
1779 );
1780
1781 if let Ok(host) = std::env::var("DATABASE_MYSQL_HOST") {
1783 self.database.mysql.host = host;
1784 }
1785 self.database.mysql.port =
1786 parse_env::<u16>("DATABASE_MYSQL_PORT", self.database.mysql.port);
1787 if let Ok(user) = std::env::var("DATABASE_MYSQL_USERNAME") {
1788 self.database.mysql.username = user;
1789 }
1790 if let Ok(pass) = std::env::var("DATABASE_MYSQL_PASSWORD") {
1791 self.database.mysql.password = pass;
1792 }
1793 if let Ok(db) = std::env::var("DATABASE_MYSQL_DATABASE") {
1794 self.database.mysql.database = db;
1795 }
1796 if let Ok(table) = std::env::var("DATABASE_MYSQL_TABLE_NAME") {
1797 self.database.mysql.table_name = table;
1798 }
1799 override_db_pool_settings(&mut self.database.mysql, "DATABASE_MYSQL");
1800
1801 if let Ok(host) = std::env::var("DATABASE_POSTGRES_HOST") {
1803 self.database.postgres.host = host;
1804 }
1805 self.database.postgres.port =
1806 parse_env::<u16>("DATABASE_POSTGRES_PORT", self.database.postgres.port);
1807 if let Ok(user) = std::env::var("DATABASE_POSTGRES_USERNAME") {
1808 self.database.postgres.username = user;
1809 }
1810 if let Ok(pass) = std::env::var("DATABASE_POSTGRES_PASSWORD") {
1811 self.database.postgres.password = pass;
1812 }
1813 if let Ok(db) = std::env::var("DATABASE_POSTGRES_DATABASE") {
1814 self.database.postgres.database = db;
1815 }
1816 override_db_pool_settings(&mut self.database.postgres, "DATABASE_POSTGRES");
1817
1818 if let Ok(region) = std::env::var("DATABASE_DYNAMODB_REGION") {
1820 self.database.dynamodb.region = region;
1821 }
1822 if let Ok(table) = std::env::var("DATABASE_DYNAMODB_TABLE_NAME") {
1823 self.database.dynamodb.table_name = table;
1824 }
1825 if let Ok(endpoint) = std::env::var("DATABASE_DYNAMODB_ENDPOINT_URL") {
1826 self.database.dynamodb.endpoint_url = Some(endpoint);
1827 }
1828 if let Ok(key_id) = std::env::var("AWS_ACCESS_KEY_ID") {
1829 self.database.dynamodb.aws_access_key_id = Some(key_id);
1830 }
1831 if let Ok(secret) = std::env::var("AWS_SECRET_ACCESS_KEY") {
1832 self.database.dynamodb.aws_secret_access_key = Some(secret);
1833 }
1834
1835 let apply_redis_cluster_nodes = |options: &mut Self, nodes: &str| {
1837 let node_list: Vec<String> = nodes
1838 .split(',')
1839 .map(|s| s.trim())
1840 .filter(|s| !s.is_empty())
1841 .map(ToString::to_string)
1842 .collect();
1843
1844 options.adapter.cluster.nodes = node_list.clone();
1845 options.queue.redis_cluster.nodes = node_list.clone();
1846
1847 let parsed_nodes: Vec<ClusterNode> = node_list
1848 .iter()
1849 .filter_map(|seed| ClusterNode::from_seed(seed))
1850 .collect();
1851 options.database.redis.cluster.nodes = parsed_nodes.clone();
1852 options.database.redis.cluster_nodes = parsed_nodes;
1853 };
1854
1855 if let Ok(nodes) = std::env::var("REDIS_CLUSTER_NODES") {
1856 apply_redis_cluster_nodes(self, &nodes);
1857 }
1858 if let Ok(nodes) = std::env::var("DATABASE_REDIS_CLUSTER_NODES") {
1859 apply_redis_cluster_nodes(self, &nodes);
1860 }
1861 self.queue.redis_cluster.concurrency = parse_env::<u32>(
1862 "REDIS_CLUSTER_QUEUE_CONCURRENCY",
1863 self.queue.redis_cluster.concurrency,
1864 );
1865 if let Ok(prefix) = std::env::var("REDIS_CLUSTER_QUEUE_PREFIX") {
1866 self.queue.redis_cluster.prefix = Some(prefix);
1867 }
1868
1869 self.ssl.enabled = parse_bool_env("SSL_ENABLED", self.ssl.enabled);
1871 if let Ok(val) = std::env::var("SSL_CERT_PATH") {
1872 self.ssl.cert_path = val;
1873 }
1874 if let Ok(val) = std::env::var("SSL_KEY_PATH") {
1875 self.ssl.key_path = val;
1876 }
1877 self.ssl.redirect_http = parse_bool_env("SSL_REDIRECT_HTTP", self.ssl.redirect_http);
1878 if let Some(port) = parse_env_optional::<u16>("SSL_HTTP_PORT") {
1879 self.ssl.http_port = Some(port);
1880 }
1881
1882 self.unix_socket.enabled = parse_bool_env("UNIX_SOCKET_ENABLED", self.unix_socket.enabled);
1884 if let Ok(path) = std::env::var("UNIX_SOCKET_PATH") {
1885 self.unix_socket.path = path;
1886 }
1887 if let Ok(mode_str) = std::env::var("UNIX_SOCKET_PERMISSION_MODE") {
1888 if mode_str.chars().all(|c| c.is_digit(8)) {
1889 if let Ok(mode) = u32::from_str_radix(&mode_str, 8) {
1890 if mode <= 0o777 {
1891 self.unix_socket.permission_mode = mode;
1892 } else {
1893 warn!(
1894 "UNIX_SOCKET_PERMISSION_MODE '{}' exceeds maximum value 777. Using default: {:o}",
1895 mode_str, self.unix_socket.permission_mode
1896 );
1897 }
1898 } else {
1899 warn!(
1900 "Failed to parse UNIX_SOCKET_PERMISSION_MODE '{}' as octal. Using default: {:o}",
1901 mode_str, self.unix_socket.permission_mode
1902 );
1903 }
1904 } else {
1905 warn!(
1906 "UNIX_SOCKET_PERMISSION_MODE '{}' must contain only octal digits (0-7). Using default: {:o}",
1907 mode_str, self.unix_socket.permission_mode
1908 );
1909 }
1910 }
1911
1912 if let Ok(driver_str) = std::env::var("METRICS_DRIVER") {
1914 self.metrics.driver =
1915 parse_driver_enum(driver_str, self.metrics.driver.clone(), "Metrics");
1916 }
1917 self.metrics.enabled = parse_bool_env("METRICS_ENABLED", self.metrics.enabled);
1918 if let Ok(val) = std::env::var("METRICS_HOST") {
1919 self.metrics.host = val;
1920 }
1921 self.metrics.port = parse_env::<u16>("METRICS_PORT", self.metrics.port);
1922 if let Ok(val) = std::env::var("METRICS_PROMETHEUS_PREFIX") {
1923 self.metrics.prometheus.prefix = val;
1924 }
1925
1926 self.rate_limiter.enabled =
1928 parse_bool_env("RATE_LIMITER_ENABLED", self.rate_limiter.enabled);
1929 self.rate_limiter.api_rate_limit.max_requests = parse_env::<u32>(
1930 "RATE_LIMITER_API_MAX_REQUESTS",
1931 self.rate_limiter.api_rate_limit.max_requests,
1932 );
1933 self.rate_limiter.api_rate_limit.window_seconds = parse_env::<u64>(
1934 "RATE_LIMITER_API_WINDOW_SECONDS",
1935 self.rate_limiter.api_rate_limit.window_seconds,
1936 );
1937 if let Some(hops) = parse_env_optional::<u32>("RATE_LIMITER_API_TRUST_HOPS") {
1938 self.rate_limiter.api_rate_limit.trust_hops = Some(hops);
1939 }
1940 self.rate_limiter.websocket_rate_limit.max_requests = parse_env::<u32>(
1941 "RATE_LIMITER_WS_MAX_REQUESTS",
1942 self.rate_limiter.websocket_rate_limit.max_requests,
1943 );
1944 self.rate_limiter.websocket_rate_limit.window_seconds = parse_env::<u64>(
1945 "RATE_LIMITER_WS_WINDOW_SECONDS",
1946 self.rate_limiter.websocket_rate_limit.window_seconds,
1947 );
1948 if let Ok(prefix) = std::env::var("RATE_LIMITER_REDIS_PREFIX") {
1949 self.rate_limiter.redis.prefix = Some(prefix);
1950 }
1951
1952 self.queue.redis.concurrency =
1954 parse_env::<u32>("QUEUE_REDIS_CONCURRENCY", self.queue.redis.concurrency);
1955 if let Ok(prefix) = std::env::var("QUEUE_REDIS_PREFIX") {
1956 self.queue.redis.prefix = Some(prefix);
1957 }
1958
1959 if let Ok(region) = std::env::var("QUEUE_SQS_REGION") {
1961 self.queue.sqs.region = region;
1962 }
1963 self.queue.sqs.visibility_timeout = parse_env::<i32>(
1964 "QUEUE_SQS_VISIBILITY_TIMEOUT",
1965 self.queue.sqs.visibility_timeout,
1966 );
1967 self.queue.sqs.max_messages =
1968 parse_env::<i32>("QUEUE_SQS_MAX_MESSAGES", self.queue.sqs.max_messages);
1969 self.queue.sqs.wait_time_seconds = parse_env::<i32>(
1970 "QUEUE_SQS_WAIT_TIME_SECONDS",
1971 self.queue.sqs.wait_time_seconds,
1972 );
1973 self.queue.sqs.concurrency =
1974 parse_env::<u32>("QUEUE_SQS_CONCURRENCY", self.queue.sqs.concurrency);
1975 self.queue.sqs.fifo = parse_bool_env("QUEUE_SQS_FIFO", self.queue.sqs.fifo);
1976 if let Ok(endpoint) = std::env::var("QUEUE_SQS_ENDPOINT_URL") {
1977 self.queue.sqs.endpoint_url = Some(endpoint);
1978 }
1979
1980 if let Ok(region) = std::env::var("QUEUE_SNS_REGION") {
1982 self.queue.sns.region = region;
1983 }
1984 if let Ok(topic_arn) = std::env::var("QUEUE_SNS_TOPIC_ARN") {
1985 self.queue.sns.topic_arn = topic_arn;
1986 }
1987 if let Ok(endpoint) = std::env::var("QUEUE_SNS_ENDPOINT_URL") {
1988 self.queue.sns.endpoint_url = Some(endpoint);
1989 }
1990
1991 self.webhooks.batching.enabled =
1993 parse_bool_env("WEBHOOK_BATCHING_ENABLED", self.webhooks.batching.enabled);
1994 self.webhooks.batching.duration =
1995 parse_env::<u64>("WEBHOOK_BATCHING_DURATION", self.webhooks.batching.duration);
1996 self.webhooks.batching.size =
1997 parse_env::<usize>("WEBHOOK_BATCHING_SIZE", self.webhooks.batching.size);
1998
1999 if let Ok(servers) = std::env::var("NATS_SERVERS") {
2001 self.adapter.nats.servers = servers.split(',').map(|s| s.trim().to_string()).collect();
2002 }
2003 if let Ok(user) = std::env::var("NATS_USERNAME") {
2004 self.adapter.nats.username = Some(user);
2005 }
2006 if let Ok(pass) = std::env::var("NATS_PASSWORD") {
2007 self.adapter.nats.password = Some(pass);
2008 }
2009 if let Ok(token) = std::env::var("NATS_TOKEN") {
2010 self.adapter.nats.token = Some(token);
2011 }
2012 self.adapter.nats.connection_timeout_ms = parse_env::<u64>(
2013 "NATS_CONNECTION_TIMEOUT_MS",
2014 self.adapter.nats.connection_timeout_ms,
2015 );
2016 self.adapter.nats.request_timeout_ms = parse_env::<u64>(
2017 "NATS_REQUEST_TIMEOUT_MS",
2018 self.adapter.nats.request_timeout_ms,
2019 );
2020
2021 if let Ok(origins) = std::env::var("CORS_ORIGINS") {
2023 self.cors.origin = origins.split(',').map(|s| s.trim().to_string()).collect();
2024 }
2025 if let Ok(methods) = std::env::var("CORS_METHODS") {
2026 self.cors.methods = methods.split(',').map(|s| s.trim().to_string()).collect();
2027 }
2028 if let Ok(headers) = std::env::var("CORS_HEADERS") {
2029 self.cors.allowed_headers = headers.split(',').map(|s| s.trim().to_string()).collect();
2030 }
2031 self.cors.credentials = parse_bool_env("CORS_CREDENTIALS", self.cors.credentials);
2032
2033 self.database_pooling.enabled =
2035 parse_bool_env("DATABASE_POOLING_ENABLED", self.database_pooling.enabled);
2036 if let Some(min) = parse_env_optional::<u32>("DATABASE_POOL_MIN") {
2037 self.database_pooling.min = min;
2038 }
2039 if let Some(max) = parse_env_optional::<u32>("DATABASE_POOL_MAX") {
2040 self.database_pooling.max = max;
2041 }
2042
2043 if let Some(pool_size) = parse_env_optional::<u32>("DATABASE_CONNECTION_POOL_SIZE") {
2044 self.database.mysql.connection_pool_size = pool_size;
2045 self.database.postgres.connection_pool_size = pool_size;
2046 }
2047 if let Some(cache_ttl) = parse_env_optional::<u64>("CACHE_TTL_SECONDS") {
2048 self.app_manager.cache.ttl = cache_ttl;
2049 self.channel_limits.cache_ttl = cache_ttl;
2050 self.database.mysql.cache_ttl = cache_ttl;
2051 self.database.postgres.cache_ttl = cache_ttl;
2052 self.cache.memory.ttl = cache_ttl;
2053 }
2054 if let Some(cleanup_interval) = parse_env_optional::<u64>("CACHE_CLEANUP_INTERVAL") {
2055 self.database.mysql.cache_cleanup_interval = cleanup_interval;
2056 self.database.postgres.cache_cleanup_interval = cleanup_interval;
2057 self.cache.memory.cleanup_interval = cleanup_interval;
2058 }
2059 if let Some(max_capacity) = parse_env_optional::<u64>("CACHE_MAX_CAPACITY") {
2060 self.database.mysql.cache_max_capacity = max_capacity;
2061 self.database.postgres.cache_max_capacity = max_capacity;
2062 self.cache.memory.max_capacity = max_capacity;
2063 }
2064
2065 let default_app_id = std::env::var("SOCKUDO_DEFAULT_APP_ID");
2066 let default_app_key = std::env::var("SOCKUDO_DEFAULT_APP_KEY");
2067 let default_app_secret = std::env::var("SOCKUDO_DEFAULT_APP_SECRET");
2068 let default_app_enabled = parse_bool_env("SOCKUDO_DEFAULT_APP_ENABLED", true);
2069
2070 if let (Ok(app_id), Ok(app_key), Ok(app_secret)) =
2071 (default_app_id, default_app_key, default_app_secret)
2072 && default_app_enabled
2073 {
2074 let default_app = App {
2075 id: app_id,
2076 key: app_key,
2077 secret: app_secret,
2078 enable_client_messages: parse_bool_env("SOCKUDO_ENABLE_CLIENT_MESSAGES", false),
2079 enabled: default_app_enabled,
2080 max_connections: parse_env::<u32>("SOCKUDO_DEFAULT_APP_MAX_CONNECTIONS", 100),
2081 max_client_events_per_second: parse_env::<u32>(
2082 "SOCKUDO_DEFAULT_APP_MAX_CLIENT_EVENTS_PER_SECOND",
2083 100,
2084 ),
2085 max_read_requests_per_second: Some(parse_env::<u32>(
2086 "SOCKUDO_DEFAULT_APP_MAX_READ_REQUESTS_PER_SECOND",
2087 100,
2088 )),
2089 max_presence_members_per_channel: Some(parse_env::<u32>(
2090 "SOCKUDO_DEFAULT_APP_MAX_PRESENCE_MEMBERS_PER_CHANNEL",
2091 100,
2092 )),
2093 max_presence_member_size_in_kb: Some(parse_env::<u32>(
2094 "SOCKUDO_DEFAULT_APP_MAX_PRESENCE_MEMBER_SIZE_IN_KB",
2095 100,
2096 )),
2097 max_channel_name_length: Some(parse_env::<u32>(
2098 "SOCKUDO_DEFAULT_APP_MAX_CHANNEL_NAME_LENGTH",
2099 100,
2100 )),
2101 max_event_channels_at_once: Some(parse_env::<u32>(
2102 "SOCKUDO_DEFAULT_APP_MAX_EVENT_CHANNELS_AT_ONCE",
2103 100,
2104 )),
2105 max_event_name_length: Some(parse_env::<u32>(
2106 "SOCKUDO_DEFAULT_APP_MAX_EVENT_NAME_LENGTH",
2107 100,
2108 )),
2109 max_event_payload_in_kb: Some(parse_env::<u32>(
2110 "SOCKUDO_DEFAULT_APP_MAX_EVENT_PAYLOAD_IN_KB",
2111 100,
2112 )),
2113 max_event_batch_size: Some(parse_env::<u32>(
2114 "SOCKUDO_DEFAULT_APP_MAX_EVENT_BATCH_SIZE",
2115 100,
2116 )),
2117 enable_user_authentication: Some(parse_bool_env(
2118 "SOCKUDO_DEFAULT_APP_ENABLE_USER_AUTHENTICATION",
2119 false,
2120 )),
2121 webhooks: None,
2122 max_backend_events_per_second: Some(parse_env::<u32>(
2123 "SOCKUDO_DEFAULT_APP_MAX_BACKEND_EVENTS_PER_SECOND",
2124 100,
2125 )),
2126 enable_watchlist_events: Some(parse_bool_env(
2127 "SOCKUDO_DEFAULT_APP_ENABLE_WATCHLIST_EVENTS",
2128 false,
2129 )),
2130 allowed_origins: {
2131 if let Ok(origins_str) = std::env::var("SOCKUDO_DEFAULT_APP_ALLOWED_ORIGINS") {
2132 if !origins_str.is_empty() {
2133 Some(
2134 origins_str
2135 .split(',')
2136 .map(|s| s.trim().to_string())
2137 .collect(),
2138 )
2139 } else {
2140 None
2141 }
2142 } else {
2143 None
2144 }
2145 },
2146 channel_delta_compression: None,
2147 };
2148
2149 self.app_manager.array.apps.push(default_app);
2150 info!("Successfully registered default app from env");
2151 }
2152
2153 if let Ok(redis_url_env) = std::env::var("REDIS_URL") {
2155 info!("Applying REDIS_URL environment variable override");
2156
2157 let redis_url_json = sonic_rs::json!(redis_url_env);
2158
2159 self.adapter
2160 .redis
2161 .redis_pub_options
2162 .insert("url".to_string(), redis_url_json.clone());
2163 self.adapter
2164 .redis
2165 .redis_sub_options
2166 .insert("url".to_string(), redis_url_json);
2167
2168 self.cache.redis.url_override = Some(redis_url_env.clone());
2169 self.queue.redis.url_override = Some(redis_url_env.clone());
2170 self.rate_limiter.redis.url_override = Some(redis_url_env);
2171 }
2172
2173 let has_colors_env = std::env::var("LOG_COLORS_ENABLED").is_ok();
2175 let has_target_env = std::env::var("LOG_INCLUDE_TARGET").is_ok();
2176 if has_colors_env || has_target_env {
2177 let logging_config = self.logging.get_or_insert_with(Default::default);
2178 if has_colors_env {
2179 logging_config.colors_enabled =
2180 parse_bool_env("LOG_COLORS_ENABLED", logging_config.colors_enabled);
2181 }
2182 if has_target_env {
2183 logging_config.include_target =
2184 parse_bool_env("LOG_INCLUDE_TARGET", logging_config.include_target);
2185 }
2186 }
2187
2188 self.cleanup.async_enabled =
2190 parse_bool_env("CLEANUP_ASYNC_ENABLED", self.cleanup.async_enabled);
2191 self.cleanup.fallback_to_sync =
2192 parse_bool_env("CLEANUP_FALLBACK_TO_SYNC", self.cleanup.fallback_to_sync);
2193 self.cleanup.queue_buffer_size =
2194 parse_env::<usize>("CLEANUP_QUEUE_BUFFER_SIZE", self.cleanup.queue_buffer_size);
2195 self.cleanup.batch_size = parse_env::<usize>("CLEANUP_BATCH_SIZE", self.cleanup.batch_size);
2196 self.cleanup.batch_timeout_ms =
2197 parse_env::<u64>("CLEANUP_BATCH_TIMEOUT_MS", self.cleanup.batch_timeout_ms);
2198 if let Ok(worker_threads_str) = std::env::var("CLEANUP_WORKER_THREADS") {
2199 self.cleanup.worker_threads = if worker_threads_str.to_lowercase() == "auto" {
2200 WorkerThreadsConfig::Auto
2201 } else if let Ok(n) = worker_threads_str.parse::<usize>() {
2202 WorkerThreadsConfig::Fixed(n)
2203 } else {
2204 warn!(
2205 "Invalid CLEANUP_WORKER_THREADS value '{}', keeping current setting",
2206 worker_threads_str
2207 );
2208 self.cleanup.worker_threads.clone()
2209 };
2210 }
2211 self.cleanup.max_retry_attempts = parse_env::<u32>(
2212 "CLEANUP_MAX_RETRY_ATTEMPTS",
2213 self.cleanup.max_retry_attempts,
2214 );
2215
2216 self.cluster_health.enabled =
2218 parse_bool_env("CLUSTER_HEALTH_ENABLED", self.cluster_health.enabled);
2219 self.cluster_health.heartbeat_interval_ms = parse_env::<u64>(
2220 "CLUSTER_HEALTH_HEARTBEAT_INTERVAL",
2221 self.cluster_health.heartbeat_interval_ms,
2222 );
2223 self.cluster_health.node_timeout_ms = parse_env::<u64>(
2224 "CLUSTER_HEALTH_NODE_TIMEOUT",
2225 self.cluster_health.node_timeout_ms,
2226 );
2227 self.cluster_health.cleanup_interval_ms = parse_env::<u64>(
2228 "CLUSTER_HEALTH_CLEANUP_INTERVAL",
2229 self.cluster_health.cleanup_interval_ms,
2230 );
2231
2232 self.tag_filtering.enabled =
2234 parse_bool_env("TAG_FILTERING_ENABLED", self.tag_filtering.enabled);
2235
2236 if let Ok(val) = std::env::var("WEBSOCKET_MAX_MESSAGES") {
2238 if val.to_lowercase() == "none" || val == "0" {
2239 self.websocket.max_messages = None;
2240 } else if let Ok(n) = val.parse::<usize>() {
2241 self.websocket.max_messages = Some(n);
2242 }
2243 }
2244 if let Ok(val) = std::env::var("WEBSOCKET_MAX_BYTES") {
2245 if val.to_lowercase() == "none" || val == "0" {
2246 self.websocket.max_bytes = None;
2247 } else if let Ok(n) = val.parse::<usize>() {
2248 self.websocket.max_bytes = Some(n);
2249 }
2250 }
2251 self.websocket.disconnect_on_buffer_full = parse_bool_env(
2252 "WEBSOCKET_DISCONNECT_ON_BUFFER_FULL",
2253 self.websocket.disconnect_on_buffer_full,
2254 );
2255 self.websocket.max_message_size = parse_env::<usize>(
2256 "WEBSOCKET_MAX_MESSAGE_SIZE",
2257 self.websocket.max_message_size,
2258 );
2259 self.websocket.max_frame_size =
2260 parse_env::<usize>("WEBSOCKET_MAX_FRAME_SIZE", self.websocket.max_frame_size);
2261 self.websocket.write_buffer_size = parse_env::<usize>(
2262 "WEBSOCKET_WRITE_BUFFER_SIZE",
2263 self.websocket.write_buffer_size,
2264 );
2265 self.websocket.max_backpressure = parse_env::<usize>(
2266 "WEBSOCKET_MAX_BACKPRESSURE",
2267 self.websocket.max_backpressure,
2268 );
2269 self.websocket.auto_ping = parse_bool_env("WEBSOCKET_AUTO_PING", self.websocket.auto_ping);
2270 self.websocket.ping_interval =
2271 parse_env::<u32>("WEBSOCKET_PING_INTERVAL", self.websocket.ping_interval);
2272 self.websocket.idle_timeout =
2273 parse_env::<u32>("WEBSOCKET_IDLE_TIMEOUT", self.websocket.idle_timeout);
2274 if let Ok(mode) = std::env::var("WEBSOCKET_COMPRESSION") {
2275 self.websocket.compression = mode;
2276 }
2277
2278 Ok(())
2279 }
2280
2281 pub fn validate(&self) -> Result<(), String> {
2282 if self.unix_socket.enabled {
2283 if self.unix_socket.path.is_empty() {
2284 return Err(
2285 "Unix socket path cannot be empty when Unix socket is enabled".to_string(),
2286 );
2287 }
2288
2289 self.validate_unix_socket_security()?;
2290
2291 if self.ssl.enabled {
2292 tracing::warn!(
2293 "Both Unix socket and SSL are enabled. This is unusual as Unix sockets are typically used behind reverse proxies that handle SSL termination."
2294 );
2295 }
2296
2297 if self.unix_socket.permission_mode > 0o777 {
2298 return Err(format!(
2299 "Unix socket permission_mode ({:o}) is invalid. Must be a valid octal mode (0o000 to 0o777)",
2300 self.unix_socket.permission_mode
2301 ));
2302 }
2303 }
2304
2305 if let Err(e) = self.cleanup.validate() {
2306 return Err(format!("Invalid cleanup configuration: {}", e));
2307 }
2308
2309 Ok(())
2310 }
2311
2312 fn validate_unix_socket_security(&self) -> Result<(), String> {
2313 let path = &self.unix_socket.path;
2314
2315 if path.contains("../") || path.contains("..\\") {
2316 return Err(
2317 "Unix socket path contains directory traversal sequences (../). This is not allowed for security reasons.".to_string()
2318 );
2319 }
2320
2321 if self.unix_socket.permission_mode & 0o002 != 0 {
2322 warn!(
2323 "Unix socket permission mode ({:o}) allows world write access. This may be a security risk. Consider using more restrictive permissions like 0o660 or 0o750.",
2324 self.unix_socket.permission_mode
2325 );
2326 }
2327
2328 if self.unix_socket.permission_mode & 0o007 > 0o005 {
2329 warn!(
2330 "Unix socket permission mode ({:o}) grants write permissions to others. Consider using more restrictive permissions.",
2331 self.unix_socket.permission_mode
2332 );
2333 }
2334
2335 if self.mode == "production" && path.starts_with("/tmp/") {
2336 warn!(
2337 "Unix socket path '{}' is in /tmp directory. In production, consider using a more permanent location like /var/run/sockudo/ for better security and persistence.",
2338 path
2339 );
2340 }
2341
2342 if !path.starts_with('/') {
2343 return Err(
2344 "Unix socket path must be absolute (start with /) for security and reliability."
2345 .to_string(),
2346 );
2347 }
2348
2349 Ok(())
2350 }
2351}