1use crate::app::App;
2use crate::utils::{parse_bool_env, parse_env, parse_env_optional};
3use ahash::AHashMap;
4use serde::{Deserialize, Serialize};
5use std::str::FromStr;
6use tracing::{info, warn};
7use url::Url;
8
9fn deserialize_octal_permission<'de, D>(deserializer: D) -> Result<u32, D::Error>
11where
12 D: serde::Deserializer<'de>,
13{
14 use serde::de::{self};
15
16 let s = String::deserialize(deserializer)?;
17
18 if !s.chars().all(|c| c.is_digit(8)) {
20 return Err(de::Error::custom(format!(
21 "invalid octal permission mode '{}': must contain only digits 0-7",
22 s
23 )));
24 }
25
26 let mode = u32::from_str_radix(&s, 8)
28 .map_err(|_| de::Error::custom(format!("invalid octal permission mode: {}", s)))?;
29
30 if mode > 0o777 {
32 return Err(de::Error::custom(format!(
33 "permission mode '{}' exceeds maximum value 777",
34 s
35 )));
36 }
37
38 Ok(mode)
39}
40
41fn parse_driver_enum<T: FromStr + Clone + std::fmt::Debug>(
43 driver_str: String,
44 default_driver: T,
45 driver_name: &str,
46) -> T
47where
48 <T as FromStr>::Err: std::fmt::Debug,
49{
50 match T::from_str(&driver_str.to_lowercase()) {
51 Ok(driver_enum) => driver_enum,
52 Err(e) => {
53 warn!(
54 "Failed to parse {} driver from string '{}': {:?}. Using default: {:?}.",
55 driver_name, driver_str, e, default_driver
56 );
57 default_driver
58 }
59 }
60}
61
62fn override_db_pool_settings(db_conn: &mut DatabaseConnection, prefix: &str) {
63 if let Some(min) = parse_env_optional::<u32>(&format!("{}_POOL_MIN", prefix)) {
64 db_conn.pool_min = Some(min);
65 }
66 if let Some(max) = parse_env_optional::<u32>(&format!("{}_POOL_MAX", prefix)) {
67 db_conn.pool_max = Some(max);
68 }
69}
70
71#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
74#[serde(rename_all = "lowercase")]
75pub enum AdapterDriver {
76 #[default]
77 Local,
78 Redis,
79 #[serde(rename = "redis-cluster")]
80 RedisCluster,
81 Nats,
82 Pulsar,
83 RabbitMq,
84 #[serde(rename = "google-pubsub")]
85 GooglePubSub,
86 Kafka,
87}
88
89#[derive(Debug, Clone, Serialize, Deserialize)]
90#[serde(default)]
91pub struct DynamoDbSettings {
92 pub region: String,
93 pub table_name: String,
94 pub endpoint_url: Option<String>,
95 pub aws_access_key_id: Option<String>,
96 pub aws_secret_access_key: Option<String>,
97 pub aws_profile_name: Option<String>,
98}
99
100impl Default for DynamoDbSettings {
101 fn default() -> Self {
102 Self {
103 region: "us-east-1".to_string(),
104 table_name: "sockudo-applications".to_string(),
105 endpoint_url: None,
106 aws_access_key_id: None,
107 aws_secret_access_key: None,
108 aws_profile_name: None,
109 }
110 }
111}
112
113#[derive(Debug, Clone, Serialize, Deserialize)]
114#[serde(default)]
115pub struct ScyllaDbSettings {
116 pub nodes: Vec<String>,
117 pub keyspace: String,
118 pub table_name: String,
119 pub username: Option<String>,
120 pub password: Option<String>,
121 pub replication_class: String,
122 pub replication_factor: u32,
123}
124
125impl Default for ScyllaDbSettings {
126 fn default() -> Self {
127 Self {
128 nodes: vec!["127.0.0.1:9042".to_string()],
129 keyspace: "sockudo".to_string(),
130 table_name: "applications".to_string(),
131 username: None,
132 password: None,
133 replication_class: "SimpleStrategy".to_string(),
134 replication_factor: 3,
135 }
136 }
137}
138
139#[derive(Debug, Clone, Serialize, Deserialize)]
140#[serde(default)]
141pub struct SurrealDbSettings {
142 pub url: String,
143 pub namespace: String,
144 pub database: String,
145 pub username: String,
146 pub password: String,
147 pub table_name: String,
148 pub cache_ttl: u64,
149 pub cache_max_capacity: u64,
150}
151
152impl Default for SurrealDbSettings {
153 fn default() -> Self {
154 Self {
155 url: "ws://127.0.0.1:8000".to_string(),
156 namespace: "sockudo".to_string(),
157 database: "sockudo".to_string(),
158 username: "root".to_string(),
159 password: "root".to_string(),
160 table_name: "applications".to_string(),
161 cache_ttl: 300,
162 cache_max_capacity: 100,
163 }
164 }
165}
166
167impl FromStr for AdapterDriver {
168 type Err = String;
169 fn from_str(s: &str) -> Result<Self, Self::Err> {
170 match s.to_lowercase().as_str() {
171 "local" => Ok(AdapterDriver::Local),
172 "redis" => Ok(AdapterDriver::Redis),
173 "redis-cluster" => Ok(AdapterDriver::RedisCluster),
174 "nats" => Ok(AdapterDriver::Nats),
175 "pulsar" => Ok(AdapterDriver::Pulsar),
176 "rabbitmq" | "rabbit-mq" => Ok(AdapterDriver::RabbitMq),
177 "google-pubsub" | "gcp-pubsub" | "pubsub" => Ok(AdapterDriver::GooglePubSub),
178 "kafka" => Ok(AdapterDriver::Kafka),
179 _ => Err(format!("Unknown adapter driver: {s}")),
180 }
181 }
182}
183
184#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
185#[serde(rename_all = "lowercase")]
186pub enum AppManagerDriver {
187 #[default]
188 Memory,
189 Mysql,
190 Dynamodb,
191 PgSql,
192 SurrealDb,
193 ScyllaDb,
194}
195impl FromStr for AppManagerDriver {
196 type Err = String;
197 fn from_str(s: &str) -> Result<Self, Self::Err> {
198 match s.to_lowercase().as_str() {
199 "memory" => Ok(AppManagerDriver::Memory),
200 "mysql" => Ok(AppManagerDriver::Mysql),
201 "dynamodb" => Ok(AppManagerDriver::Dynamodb),
202 "pgsql" | "postgres" | "postgresql" => Ok(AppManagerDriver::PgSql),
203 "surreal" | "surrealdb" => Ok(AppManagerDriver::SurrealDb),
204 "scylladb" | "scylla" => Ok(AppManagerDriver::ScyllaDb),
205 _ => Err(format!("Unknown app manager driver: {s}")),
206 }
207 }
208}
209
210#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
211#[serde(rename_all = "lowercase")]
212pub enum CacheDriver {
213 #[default]
214 Memory,
215 Redis,
216 #[serde(rename = "redis-cluster")]
217 RedisCluster,
218 None,
219}
220
221impl FromStr for CacheDriver {
222 type Err = String;
223 fn from_str(s: &str) -> Result<Self, Self::Err> {
224 match s.to_lowercase().as_str() {
225 "memory" => Ok(CacheDriver::Memory),
226 "redis" => Ok(CacheDriver::Redis),
227 "redis-cluster" => Ok(CacheDriver::RedisCluster),
228 "none" => Ok(CacheDriver::None),
229 _ => Err(format!("Unknown cache driver: {s}")),
230 }
231 }
232}
233
234#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
235#[serde(rename_all = "lowercase")]
236pub enum QueueDriver {
237 Memory,
238 #[default]
239 Redis,
240 #[serde(rename = "redis-cluster")]
241 RedisCluster,
242 Nats,
243 Pulsar,
244 RabbitMq,
245 #[serde(rename = "google-pubsub")]
246 GooglePubSub,
247 Kafka,
248 Sqs,
249 Sns,
250 None,
251}
252
253impl FromStr for QueueDriver {
254 type Err = String;
255 fn from_str(s: &str) -> Result<Self, Self::Err> {
256 match s.to_lowercase().as_str() {
257 "memory" => Ok(QueueDriver::Memory),
258 "redis" => Ok(QueueDriver::Redis),
259 "redis-cluster" => Ok(QueueDriver::RedisCluster),
260 "nats" => Ok(QueueDriver::Nats),
261 "pulsar" => Ok(QueueDriver::Pulsar),
262 "rabbitmq" | "rabbit-mq" => Ok(QueueDriver::RabbitMq),
263 "google-pubsub" | "gcp-pubsub" | "pubsub" => Ok(QueueDriver::GooglePubSub),
264 "kafka" => Ok(QueueDriver::Kafka),
265 "sqs" => Ok(QueueDriver::Sqs),
266 "sns" => Ok(QueueDriver::Sns),
267 "none" => Ok(QueueDriver::None),
268 _ => Err(format!("Unknown queue driver: {s}")),
269 }
270 }
271}
272
273#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
274#[serde(rename_all = "snake_case")]
275pub enum DeltaCoordinationBackend {
276 #[default]
277 Auto,
278 None,
279 Redis,
280 RedisCluster,
281 Nats,
282}
283
284impl FromStr for DeltaCoordinationBackend {
285 type Err = String;
286
287 fn from_str(s: &str) -> Result<Self, Self::Err> {
288 match s.trim().to_ascii_lowercase().as_str() {
289 "auto" => Ok(Self::Auto),
290 "none" => Ok(Self::None),
291 "redis" => Ok(Self::Redis),
292 "redis-cluster" | "redis_cluster" => Ok(Self::RedisCluster),
293 "nats" => Ok(Self::Nats),
294 _ => Err(format!("Unknown delta coordination backend: {s}")),
295 }
296 }
297}
298
299impl AsRef<str> for QueueDriver {
300 fn as_ref(&self) -> &str {
301 match self {
302 QueueDriver::Memory => "memory",
303 QueueDriver::Redis => "redis",
304 QueueDriver::RedisCluster => "redis-cluster",
305 QueueDriver::Nats => "nats",
306 QueueDriver::Pulsar => "pulsar",
307 QueueDriver::RabbitMq => "rabbitmq",
308 QueueDriver::GooglePubSub => "google-pubsub",
309 QueueDriver::Kafka => "kafka",
310 QueueDriver::Sqs => "sqs",
311 QueueDriver::Sns => "sns",
312 QueueDriver::None => "none",
313 }
314 }
315}
316
317#[derive(Debug, Clone, Serialize, Deserialize)]
318#[serde(default)]
319pub struct RedisClusterQueueConfig {
320 pub concurrency: u32,
321 pub prefix: Option<String>,
322 pub nodes: Vec<String>,
323 pub request_timeout_ms: u64,
324}
325
326impl Default for RedisClusterQueueConfig {
327 fn default() -> Self {
328 Self {
329 concurrency: 5,
330 prefix: Some("sockudo_queue:".to_string()),
331 nodes: vec!["redis://127.0.0.1:6379".to_string()],
332 request_timeout_ms: 5000,
333 }
334 }
335}
336
337#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
338#[serde(rename_all = "lowercase")]
339pub enum MetricsDriver {
340 #[default]
341 Prometheus,
342}
343
344#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
345#[serde(rename_all = "lowercase")]
346pub enum LogOutputFormat {
347 #[default]
348 Human,
349 Json,
350}
351
352impl FromStr for LogOutputFormat {
353 type Err = String;
354 fn from_str(s: &str) -> Result<Self, Self::Err> {
355 match s.to_lowercase().as_str() {
356 "human" => Ok(LogOutputFormat::Human),
357 "json" => Ok(LogOutputFormat::Json),
358 _ => Err(format!("Unknown log output format: {s}")),
359 }
360 }
361}
362
363impl FromStr for MetricsDriver {
364 type Err = String;
365 fn from_str(s: &str) -> Result<Self, Self::Err> {
366 match s.to_lowercase().as_str() {
367 "prometheus" => Ok(MetricsDriver::Prometheus),
368 _ => Err(format!("Unknown metrics driver: {s}")),
369 }
370 }
371}
372
373impl AsRef<str> for MetricsDriver {
374 fn as_ref(&self) -> &str {
375 match self {
376 MetricsDriver::Prometheus => "prometheus",
377 }
378 }
379}
380
381#[derive(Debug, Clone, Serialize, Deserialize)]
383#[serde(default)]
384pub struct ServerOptions {
385 pub adapter: AdapterConfig,
386 pub app_manager: AppManagerConfig,
387 pub cache: CacheConfig,
388 pub channel_limits: ChannelLimits,
389 pub cors: CorsConfig,
390 pub database: DatabaseConfig,
391 pub database_pooling: DatabasePooling,
392 pub debug: bool,
393 pub event_limits: EventLimits,
394 pub host: String,
395 pub http_api: HttpApiConfig,
396 pub instance: InstanceConfig,
397 pub logging: Option<LoggingConfig>,
398 pub metrics: MetricsConfig,
399 pub mode: String,
400 pub port: u16,
401 pub path_prefix: String,
402 pub presence: PresenceConfig,
403 pub queue: QueueConfig,
404 pub rate_limiter: RateLimiterConfig,
405 pub shutdown_grace_period: u64,
406 pub ssl: SslConfig,
407 pub user_authentication_timeout: u64,
408 pub webhooks: WebhooksConfig,
409 pub websocket_max_payload_kb: u32,
410 pub cleanup: CleanupConfig,
411 pub activity_timeout: u64,
412 pub cluster_health: ClusterHealthConfig,
413 pub unix_socket: UnixSocketConfig,
414 pub delta_compression: DeltaCompressionOptionsConfig,
415 pub tag_filtering: TagFilteringConfig,
416 pub websocket: WebSocketConfig,
417 pub connection_recovery: ConnectionRecoveryConfig,
418 pub history: HistoryConfig,
419 pub presence_history: PresenceHistoryConfig,
420 pub idempotency: IdempotencyConfig,
421 pub ephemeral: EphemeralConfig,
422 pub echo_control: EchoControlConfig,
423 pub event_name_filtering: EventNameFilteringConfig,
424 pub versioned_messages: VersionedMessagesConfig,
425}
426
427#[derive(Debug, Clone, Serialize, Deserialize)]
430#[serde(default)]
431pub struct SqsQueueConfig {
432 pub region: String,
433 pub queue_url_prefix: Option<String>,
434 pub visibility_timeout: i32,
435 pub endpoint_url: Option<String>,
436 pub max_messages: i32,
437 pub wait_time_seconds: i32,
438 pub concurrency: u32,
439 pub fifo: bool,
440 pub message_group_id: Option<String>,
441}
442
443#[derive(Debug, Clone, Serialize, Deserialize)]
444#[serde(default)]
445pub struct SnsQueueConfig {
446 pub region: String,
447 pub topic_arn: String,
448 pub endpoint_url: Option<String>,
449}
450
451#[derive(Debug, Clone, Serialize, Deserialize)]
452#[serde(default)]
453pub struct AdapterConfig {
454 pub driver: AdapterDriver,
455 pub redis: RedisAdapterConfig,
456 pub cluster: RedisClusterAdapterConfig,
457 pub nats: NatsAdapterConfig,
458 pub pulsar: PulsarAdapterConfig,
459 pub rabbitmq: RabbitMqAdapterConfig,
460 pub google_pubsub: GooglePubSubAdapterConfig,
461 pub kafka: KafkaAdapterConfig,
462 #[serde(default = "default_buffer_multiplier_per_cpu")]
463 pub buffer_multiplier_per_cpu: usize,
464 pub cluster_health: ClusterHealthConfig,
465 #[serde(default = "default_enable_socket_counting")]
466 pub enable_socket_counting: bool,
467 #[serde(default = "default_fallback_to_local")]
468 pub fallback_to_local: bool,
469}
470
471fn default_enable_socket_counting() -> bool {
472 true
473}
474
475fn default_fallback_to_local() -> bool {
476 true
477}
478
479fn default_buffer_multiplier_per_cpu() -> usize {
480 64
481}
482
483impl Default for AdapterConfig {
484 fn default() -> Self {
485 Self {
486 driver: AdapterDriver::default(),
487 redis: RedisAdapterConfig::default(),
488 cluster: RedisClusterAdapterConfig::default(),
489 nats: NatsAdapterConfig::default(),
490 pulsar: PulsarAdapterConfig::default(),
491 rabbitmq: RabbitMqAdapterConfig::default(),
492 google_pubsub: GooglePubSubAdapterConfig::default(),
493 kafka: KafkaAdapterConfig::default(),
494 buffer_multiplier_per_cpu: default_buffer_multiplier_per_cpu(),
495 cluster_health: ClusterHealthConfig::default(),
496 enable_socket_counting: default_enable_socket_counting(),
497 fallback_to_local: default_fallback_to_local(),
498 }
499 }
500}
501
502#[derive(Debug, Clone, Serialize, Deserialize)]
503#[serde(default)]
504pub struct RedisAdapterConfig {
505 pub requests_timeout: u64,
506 pub prefix: String,
507 pub redis_pub_options: AHashMap<String, sonic_rs::Value>,
508 pub redis_sub_options: AHashMap<String, sonic_rs::Value>,
509 pub cluster_mode: bool,
510}
511
512#[derive(Debug, Clone, Serialize, Deserialize)]
513#[serde(default)]
514pub struct RedisClusterAdapterConfig {
515 pub nodes: Vec<String>,
516 pub prefix: String,
517 pub request_timeout_ms: u64,
518 pub use_connection_manager: bool,
519 #[serde(default)]
520 pub use_sharded_pubsub: bool,
521}
522
523#[derive(Debug, Clone, Serialize, Deserialize)]
524#[serde(default)]
525pub struct NatsAdapterConfig {
526 pub servers: Vec<String>,
527 pub prefix: String,
528 pub request_timeout_ms: u64,
529 pub username: Option<String>,
530 pub password: Option<String>,
531 pub token: Option<String>,
532 pub connection_timeout_ms: u64,
533 pub nodes_number: Option<u32>,
534 pub discovery_max_wait_ms: u64,
535 pub discovery_idle_wait_ms: u64,
536}
537
538#[derive(Debug, Clone, Serialize, Deserialize)]
539#[serde(default)]
540pub struct PulsarAdapterConfig {
541 pub url: String,
542 pub prefix: String,
543 pub request_timeout_ms: u64,
544 pub token: Option<String>,
545 pub nodes_number: Option<u32>,
546}
547
548#[derive(Debug, Clone, Serialize, Deserialize)]
549#[serde(default)]
550pub struct RabbitMqAdapterConfig {
551 pub url: String,
552 pub prefix: String,
553 pub request_timeout_ms: u64,
554 pub connection_timeout_ms: u64,
555 pub nodes_number: Option<u32>,
556}
557
558#[derive(Debug, Clone, Serialize, Deserialize)]
559#[serde(default)]
560pub struct GooglePubSubAdapterConfig {
561 pub project_id: String,
562 pub prefix: String,
563 pub request_timeout_ms: u64,
564 pub emulator_host: Option<String>,
565 pub nodes_number: Option<u32>,
566}
567
568#[derive(Debug, Clone, Serialize, Deserialize)]
569#[serde(default)]
570pub struct KafkaAdapterConfig {
571 pub brokers: Vec<String>,
572 pub prefix: String,
573 pub request_timeout_ms: u64,
574 pub security_protocol: Option<String>,
575 pub sasl_mechanism: Option<String>,
576 pub sasl_username: Option<String>,
577 pub sasl_password: Option<String>,
578 pub nodes_number: Option<u32>,
579}
580
581#[derive(Debug, Clone, Serialize, Deserialize, Default)]
582#[serde(default)]
583pub struct AppManagerConfig {
584 pub driver: AppManagerDriver,
585 pub array: ArrayConfig,
586 pub cache: CacheSettings,
587 pub scylladb: ScyllaDbSettings,
588}
589
590#[derive(Debug, Clone, Serialize, Deserialize, Default)]
591#[serde(default)]
592pub struct ArrayConfig {
593 pub apps: Vec<App>,
594}
595
596#[derive(Debug, Clone, Serialize, Deserialize)]
597#[serde(default)]
598pub struct CacheSettings {
599 pub enabled: bool,
600 pub ttl: u64,
601}
602
603#[derive(Debug, Clone, Serialize, Deserialize)]
604#[serde(default)]
605pub struct MemoryCacheOptions {
606 pub ttl: u64,
607 pub cleanup_interval: u64,
608 pub max_capacity: u64,
609}
610
611#[derive(Debug, Clone, Serialize, Deserialize)]
612#[serde(default)]
613pub struct CacheConfig {
614 pub driver: CacheDriver,
615 pub redis: RedisConfig,
616 pub memory: MemoryCacheOptions,
617}
618
619#[derive(Debug, Clone, Serialize, Deserialize, Default)]
620#[serde(default)]
621pub struct RedisConfig {
622 pub prefix: Option<String>,
623 pub url_override: Option<String>,
624 pub cluster_mode: bool,
625}
626
627#[derive(Debug, Clone, Serialize, Deserialize)]
628#[serde(default)]
629pub struct ChannelLimits {
630 pub max_name_length: u32,
631 pub cache_ttl: u64,
632}
633
634#[derive(Debug, Clone, Serialize, Deserialize)]
635#[serde(default)]
636pub struct CorsConfig {
637 pub credentials: bool,
638 #[serde(deserialize_with = "deserialize_and_validate_cors_origins")]
639 pub origin: Vec<String>,
640 pub methods: Vec<String>,
641 pub allowed_headers: Vec<String>,
642}
643
644fn deserialize_and_validate_cors_origins<'de, D>(deserializer: D) -> Result<Vec<String>, D::Error>
645where
646 D: serde::Deserializer<'de>,
647{
648 use serde::de::Error;
649 let origins = Vec::<String>::deserialize(deserializer)?;
650
651 if let Err(e) = crate::origin_validation::OriginValidator::validate_patterns(&origins) {
652 return Err(D::Error::custom(format!(
653 "CORS origin pattern validation failed: {}",
654 e
655 )));
656 }
657
658 Ok(origins)
659}
660
661#[derive(Debug, Clone, Serialize, Deserialize, Default)]
662#[serde(default)]
663pub struct DatabaseConfig {
664 pub mysql: DatabaseConnection,
665 pub postgres: DatabaseConnection,
666 pub redis: RedisConnection,
667 pub dynamodb: DynamoDbSettings,
668 pub surrealdb: SurrealDbSettings,
669 pub scylladb: ScyllaDbSettings,
670}
671
672#[derive(Debug, Clone, Serialize, Deserialize)]
673#[serde(default)]
674pub struct DatabaseConnection {
675 pub host: String,
676 pub port: u16,
677 pub username: String,
678 pub password: String,
679 pub database: String,
680 pub table_name: String,
681 pub connection_pool_size: u32,
682 pub pool_min: Option<u32>,
683 pub pool_max: Option<u32>,
684 pub cache_ttl: u64,
685 pub cache_cleanup_interval: u64,
686 pub cache_max_capacity: u64,
687}
688
689#[derive(Debug, Clone, Serialize, Deserialize)]
690#[serde(default)]
691pub struct RedisConnection {
692 pub host: String,
693 pub port: u16,
694 pub db: u32,
695 pub username: Option<String>,
696 pub password: Option<String>,
697 pub key_prefix: String,
698 pub sentinels: Vec<RedisSentinel>,
699 pub sentinel_password: Option<String>,
700 pub name: String,
701 pub cluster: RedisClusterConnection,
702 pub cluster_nodes: Vec<ClusterNode>,
704}
705
706#[derive(Debug, Clone, Serialize, Deserialize)]
707#[serde(default)]
708pub struct RedisSentinel {
709 pub host: String,
710 pub port: u16,
711}
712
713#[derive(Debug, Clone, Serialize, Deserialize, Default)]
714#[serde(default)]
715pub struct RedisClusterConnection {
716 pub nodes: Vec<ClusterNode>,
717 pub username: Option<String>,
718 pub password: Option<String>,
719 #[serde(alias = "useTLS")]
720 pub use_tls: bool,
721}
722
723impl RedisConnection {
724 pub fn is_sentinel_configured(&self) -> bool {
726 !self.sentinels.is_empty()
727 }
728
729 pub fn to_url(&self) -> String {
731 if self.is_sentinel_configured() {
732 self.build_sentinel_url()
733 } else {
734 self.build_standard_url()
735 }
736 }
737
738 fn build_standard_url(&self) -> String {
739 let (scheme, host) = if self.host.starts_with("rediss://") {
741 ("rediss://", self.host.trim_start_matches("rediss://"))
742 } else if self.host.starts_with("redis://") {
743 ("redis://", self.host.trim_start_matches("redis://"))
744 } else {
745 ("redis://", self.host.as_str())
746 };
747
748 let mut url = String::from(scheme);
749
750 if let Some(ref username) = self.username {
751 url.push_str(username);
752 if let Some(ref password) = self.password {
753 url.push(':');
754 url.push_str(&urlencoding::encode(password));
755 }
756 url.push('@');
757 } else if let Some(ref password) = self.password {
758 url.push(':');
759 url.push_str(&urlencoding::encode(password));
760 url.push('@');
761 }
762
763 url.push_str(host);
764 url.push(':');
765 url.push_str(&self.port.to_string());
766 url.push('/');
767 url.push_str(&self.db.to_string());
768
769 url
770 }
771
772 fn build_sentinel_url(&self) -> String {
773 let mut url = String::from("redis+sentinel://");
774
775 if let Some(ref sentinel_password) = self.sentinel_password {
776 url.push(':');
777 url.push_str(&urlencoding::encode(sentinel_password));
778 url.push('@');
779 }
780
781 let sentinel_hosts: Vec<String> = self
782 .sentinels
783 .iter()
784 .map(|s| format!("{}:{}", s.host, s.port))
785 .collect();
786 url.push_str(&sentinel_hosts.join(","));
787
788 url.push('/');
789 url.push_str(&self.name);
790 url.push('/');
791 url.push_str(&self.db.to_string());
792
793 let mut params = Vec::new();
794 if let Some(ref password) = self.password {
795 params.push(format!("password={}", urlencoding::encode(password)));
796 }
797 if let Some(ref username) = self.username {
798 params.push(format!("username={}", urlencoding::encode(username)));
799 }
800
801 if !params.is_empty() {
802 url.push('?');
803 url.push_str(¶ms.join("&"));
804 }
805
806 url
807 }
808
809 pub fn has_cluster_nodes(&self) -> bool {
812 !self.cluster.nodes.is_empty() || !self.cluster_nodes.is_empty()
813 }
814
815 pub fn cluster_node_urls(&self) -> Vec<String> {
818 if !self.cluster.nodes.is_empty() {
819 return self.build_cluster_urls(&self.cluster.nodes);
820 }
821 self.build_cluster_urls(&self.cluster_nodes)
822 }
823
824 pub fn normalize_cluster_seed_urls(&self, seeds: &[String]) -> Vec<String> {
827 self.build_cluster_urls(
828 &seeds
829 .iter()
830 .filter_map(|seed| ClusterNode::from_seed(seed))
831 .collect::<Vec<ClusterNode>>(),
832 )
833 }
834
835 fn build_cluster_urls(&self, nodes: &[ClusterNode]) -> Vec<String> {
836 let username = self
837 .cluster
838 .username
839 .as_deref()
840 .or(self.username.as_deref());
841 let password = self
842 .cluster
843 .password
844 .as_deref()
845 .or(self.password.as_deref());
846 let use_tls = self.cluster.use_tls;
847
848 nodes
849 .iter()
850 .map(|node| node.to_url_with_options(use_tls, username, password))
851 .collect()
852 }
853}
854
855impl RedisSentinel {
856 pub fn to_host_port(&self) -> String {
857 format!("{}:{}", self.host, self.port)
858 }
859}
860
861#[derive(Debug, Clone, Serialize, Deserialize)]
862#[serde(default)]
863pub struct ClusterNode {
864 pub host: String,
865 pub port: u16,
866}
867
868impl ClusterNode {
869 pub fn to_url(&self) -> String {
870 self.to_url_with_options(false, None, None)
871 }
872
873 pub fn to_url_with_options(
874 &self,
875 use_tls: bool,
876 username: Option<&str>,
877 password: Option<&str>,
878 ) -> String {
879 let host = self.host.trim();
880
881 if host.starts_with("redis://") || host.starts_with("rediss://") {
882 if let Ok(parsed) = Url::parse(host)
883 && let Some(host_str) = parsed.host_str()
884 {
885 let scheme = parsed.scheme();
886 let port = parsed.port_or_known_default().unwrap_or(self.port);
887 let parsed_username = (!parsed.username().is_empty()).then_some(parsed.username());
888 let parsed_password = parsed.password();
889 let has_embedded_auth = parsed_username.is_some() || parsed_password.is_some();
890 let (effective_username, effective_password) = if has_embedded_auth {
891 (parsed_username, parsed_password)
892 } else {
893 (username, password)
894 };
895
896 return build_redis_url(
897 scheme,
898 host_str,
899 port,
900 effective_username,
901 effective_password,
902 );
903 }
904
905 let has_port = if let Some(bracket_pos) = host.rfind(']') {
907 host[bracket_pos..].contains(':')
908 } else {
909 host.split(':').count() >= 3
910 };
911 let base = if has_port {
912 host.to_string()
913 } else {
914 format!("{}:{}", host, self.port)
915 };
916
917 if let Ok(parsed) = Url::parse(&base) {
918 let parsed_username = (!parsed.username().is_empty()).then_some(parsed.username());
919 let parsed_password = parsed.password();
920 if let Some(host_str) = parsed.host_str() {
921 let port = parsed.port_or_known_default().unwrap_or(self.port);
922 let has_embedded_auth = parsed_username.is_some() || parsed_password.is_some();
923 let (effective_username, effective_password) = if has_embedded_auth {
924 (parsed_username, parsed_password)
925 } else {
926 (username, password)
927 };
928 return build_redis_url(
929 parsed.scheme(),
930 host_str,
931 port,
932 effective_username,
933 effective_password,
934 );
935 }
936 }
937 return base;
938 }
939
940 let (normalized_host, normalized_port) = split_plain_host_and_port(host, self.port);
941 let scheme = if use_tls { "rediss" } else { "redis" };
942 build_redis_url(
943 scheme,
944 &normalized_host,
945 normalized_port,
946 username,
947 password,
948 )
949 }
950
951 pub fn from_seed(seed: &str) -> Option<Self> {
952 let trimmed = seed.trim();
953 if trimmed.is_empty() {
954 return None;
955 }
956
957 if trimmed.starts_with("redis://") || trimmed.starts_with("rediss://") {
958 let port = Url::parse(trimmed)
959 .ok()
960 .and_then(|parsed| parsed.port_or_known_default())
961 .unwrap_or(6379);
962 return Some(Self {
963 host: trimmed.to_string(),
964 port,
965 });
966 }
967
968 let (host, port) = split_plain_host_and_port(trimmed, 6379);
969 Some(Self { host, port })
970 }
971}
972
973fn split_plain_host_and_port(raw_host: &str, default_port: u16) -> (String, u16) {
974 let host = raw_host.trim();
975
976 if host.starts_with('[') {
978 if let Some(end_bracket) = host.find(']') {
979 let host_part = host[1..end_bracket].to_string();
980 let remainder = &host[end_bracket + 1..];
981 if let Some(port_str) = remainder.strip_prefix(':')
982 && let Ok(port) = port_str.parse::<u16>()
983 {
984 return (host_part, port);
985 }
986 return (host_part, default_port);
987 }
988 return (host.to_string(), default_port);
989 }
990
991 if host.matches(':').count() == 1
993 && let Some((host_part, port_part)) = host.rsplit_once(':')
994 && let Ok(port) = port_part.parse::<u16>()
995 {
996 return (host_part.to_string(), port);
997 }
998
999 (host.to_string(), default_port)
1000}
1001
1002fn build_redis_url(
1003 scheme: &str,
1004 host: &str,
1005 port: u16,
1006 username: Option<&str>,
1007 password: Option<&str>,
1008) -> String {
1009 let mut url = format!("{scheme}://");
1010
1011 if let Some(user) = username {
1012 url.push_str(&urlencoding::encode(user));
1013 if let Some(pass) = password {
1014 url.push(':');
1015 url.push_str(&urlencoding::encode(pass));
1016 }
1017 url.push('@');
1018 } else if let Some(pass) = password {
1019 url.push(':');
1020 url.push_str(&urlencoding::encode(pass));
1021 url.push('@');
1022 }
1023
1024 if host.contains(':') && !host.starts_with('[') {
1025 url.push('[');
1026 url.push_str(host);
1027 url.push(']');
1028 } else {
1029 url.push_str(host);
1030 }
1031 url.push(':');
1032 url.push_str(&port.to_string());
1033 url
1034}
1035
1036#[derive(Debug, Clone, Serialize, Deserialize)]
1037#[serde(default)]
1038pub struct DatabasePooling {
1039 pub enabled: bool,
1040 pub min: u32,
1041 pub max: u32,
1042}
1043
1044#[derive(Debug, Clone, Serialize, Deserialize)]
1045#[serde(default)]
1046pub struct EventLimits {
1047 pub max_channels_at_once: u32,
1048 pub max_name_length: u32,
1049 pub max_payload_in_kb: u32,
1050 pub max_batch_size: u32,
1051}
1052
1053#[derive(Debug, Clone, Serialize, Deserialize)]
1054#[serde(default)]
1055pub struct IdempotencyConfig {
1056 pub enabled: bool,
1058 pub ttl_seconds: u64,
1060 pub max_key_length: usize,
1062}
1063
1064#[derive(Debug, Clone, Serialize, Deserialize)]
1065#[serde(default)]
1066pub struct EphemeralConfig {
1067 pub enabled: bool,
1069}
1070
1071#[derive(Debug, Clone, Serialize, Deserialize)]
1072#[serde(default)]
1073pub struct EchoControlConfig {
1074 pub enabled: bool,
1076 pub default_echo_messages: bool,
1078}
1079
1080#[derive(Debug, Clone, Serialize, Deserialize)]
1081#[serde(default)]
1082pub struct EventNameFilteringConfig {
1083 pub enabled: bool,
1085 pub max_events_per_filter: usize,
1087 pub max_event_name_length: usize,
1089}
1090
1091#[derive(Debug, Clone, Serialize, Deserialize)]
1092#[serde(default)]
1093pub struct ConnectionRecoveryConfig {
1094 pub enabled: bool,
1099 pub buffer_ttl_seconds: u64,
1101 pub max_buffer_size: usize,
1103}
1104
1105#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
1106#[serde(rename_all = "lowercase")]
1107pub enum VersionStoreDriver {
1108 #[default]
1109 Memory,
1110 Postgres,
1111 Mysql,
1112 DynamoDb,
1113 ScyllaDb,
1114 SurrealDb,
1115}
1116
1117impl FromStr for VersionStoreDriver {
1118 type Err = String;
1119 fn from_str(s: &str) -> Result<Self, Self::Err> {
1120 match s.to_lowercase().as_str() {
1121 "postgres" | "postgresql" | "pgsql" => Ok(Self::Postgres),
1122 "mysql" => Ok(Self::Mysql),
1123 "dynamodb" => Ok(Self::DynamoDb),
1124 "scylladb" | "scylla" => Ok(Self::ScyllaDb),
1125 "surrealdb" | "surreal" => Ok(Self::SurrealDb),
1126 "memory" => Ok(Self::Memory),
1127 _ => Err(format!("Unknown version store driver: {s}")),
1128 }
1129 }
1130}
1131
1132#[derive(Debug, Clone, Serialize, Deserialize)]
1133#[serde(default)]
1134pub struct VersionedMessagesConfig {
1135 pub enabled: bool,
1137 pub driver: VersionStoreDriver,
1139 pub max_page_size: usize,
1141 pub retention_window_seconds: u64,
1149 pub purge_interval_seconds: u64,
1152 pub purge_batch_size: usize,
1156 pub max_purge_per_tick: usize,
1159}
1160
1161#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
1162#[serde(rename_all = "lowercase")]
1163pub enum HistoryBackend {
1164 #[default]
1165 Postgres,
1166 Mysql,
1167 DynamoDb,
1168 SurrealDb,
1169 ScyllaDb,
1170 Memory,
1171}
1172
1173impl FromStr for HistoryBackend {
1174 type Err = String;
1175 fn from_str(s: &str) -> Result<Self, Self::Err> {
1176 match s.to_lowercase().as_str() {
1177 "postgres" | "postgresql" | "pgsql" => Ok(Self::Postgres),
1178 "mysql" => Ok(Self::Mysql),
1179 "dynamodb" => Ok(Self::DynamoDb),
1180 "surrealdb" | "surreal" => Ok(Self::SurrealDb),
1181 "scylladb" | "scylla" => Ok(Self::ScyllaDb),
1182 "memory" => Ok(Self::Memory),
1183 _ => Err(format!("Unknown history backend: {s}")),
1184 }
1185 }
1186}
1187
1188#[derive(Debug, Clone, Serialize, Deserialize)]
1189#[serde(default)]
1190pub struct PostgresHistoryConfig {
1191 pub table_prefix: String,
1192 pub write_timeout_ms: u64,
1193}
1194
1195impl Default for PostgresHistoryConfig {
1196 fn default() -> Self {
1197 Self {
1198 table_prefix: "sockudo_history".to_string(),
1199 write_timeout_ms: 5000,
1200 }
1201 }
1202}
1203
1204#[derive(Debug, Clone, Serialize, Deserialize)]
1205#[serde(default)]
1206pub struct MySqlHistoryConfig {
1207 pub table_prefix: String,
1208 pub write_timeout_ms: u64,
1209}
1210
1211impl Default for MySqlHistoryConfig {
1212 fn default() -> Self {
1213 Self {
1214 table_prefix: "sockudo_history".to_string(),
1215 write_timeout_ms: 5000,
1216 }
1217 }
1218}
1219
1220#[derive(Debug, Clone, Serialize, Deserialize)]
1221#[serde(default)]
1222pub struct DynamoDbHistoryConfig {
1223 pub table_prefix: String,
1224 pub write_timeout_ms: u64,
1225}
1226
1227impl Default for DynamoDbHistoryConfig {
1228 fn default() -> Self {
1229 Self {
1230 table_prefix: "sockudo_history".to_string(),
1231 write_timeout_ms: 5000,
1232 }
1233 }
1234}
1235
1236#[derive(Debug, Clone, Serialize, Deserialize)]
1237#[serde(default)]
1238pub struct SurrealDbHistoryConfig {
1239 pub table_prefix: String,
1240 pub write_timeout_ms: u64,
1241}
1242
1243impl Default for SurrealDbHistoryConfig {
1244 fn default() -> Self {
1245 Self {
1246 table_prefix: "sockudo_history".to_string(),
1247 write_timeout_ms: 5000,
1248 }
1249 }
1250}
1251
1252#[derive(Debug, Clone, Serialize, Deserialize)]
1253#[serde(default)]
1254pub struct ScyllaDbHistoryConfig {
1255 pub table_prefix: String,
1256 pub write_timeout_ms: u64,
1257}
1258
1259impl Default for ScyllaDbHistoryConfig {
1260 fn default() -> Self {
1261 Self {
1262 table_prefix: "sockudo_history".to_string(),
1263 write_timeout_ms: 5000,
1264 }
1265 }
1266}
1267
1268#[derive(Debug, Clone, Serialize, Deserialize)]
1269#[serde(default)]
1270pub struct HistoryConfig {
1271 pub enabled: bool,
1272 pub rewind_enabled: bool,
1273 pub backend: HistoryBackend,
1274 pub retention_window_seconds: u64,
1275 pub max_page_size: usize,
1276 pub max_messages_per_channel: Option<usize>,
1277 pub max_bytes_per_channel: Option<u64>,
1278 pub writer_shards: usize,
1279 pub writer_queue_capacity: usize,
1280 pub purge_interval_seconds: u64,
1281 pub purge_batch_size: usize,
1282 pub max_purge_per_tick: usize,
1283 pub postgres: PostgresHistoryConfig,
1284 pub mysql: MySqlHistoryConfig,
1285 pub dynamodb: DynamoDbHistoryConfig,
1286 pub surrealdb: SurrealDbHistoryConfig,
1287 pub scylladb: ScyllaDbHistoryConfig,
1288}
1289
1290impl Default for HistoryConfig {
1291 fn default() -> Self {
1292 Self {
1293 enabled: false,
1294 rewind_enabled: true,
1295 backend: HistoryBackend::Postgres,
1296 retention_window_seconds: 86400,
1297 max_page_size: 100,
1298 max_messages_per_channel: None,
1299 max_bytes_per_channel: None,
1300 writer_shards: 16,
1301 writer_queue_capacity: 4096,
1302 purge_interval_seconds: 300,
1303 purge_batch_size: 1000,
1304 max_purge_per_tick: 100_000,
1305 postgres: PostgresHistoryConfig::default(),
1306 mysql: MySqlHistoryConfig::default(),
1307 dynamodb: DynamoDbHistoryConfig::default(),
1308 surrealdb: SurrealDbHistoryConfig::default(),
1309 scylladb: ScyllaDbHistoryConfig::default(),
1310 }
1311 }
1312}
1313
1314impl Default for VersionedMessagesConfig {
1315 fn default() -> Self {
1316 Self {
1317 enabled: false,
1318 driver: VersionStoreDriver::Memory,
1319 max_page_size: 100,
1320 retention_window_seconds: 0,
1321 purge_interval_seconds: 300,
1322 purge_batch_size: 1000,
1323 max_purge_per_tick: 100_000,
1324 }
1325 }
1326}
1327
1328#[derive(Debug, Clone, Serialize, Deserialize)]
1329#[serde(default)]
1330pub struct PresenceHistoryConfig {
1331 pub enabled: bool,
1332 pub retention_window_seconds: u64,
1333 pub max_page_size: usize,
1334 pub max_events_per_channel: Option<usize>,
1335 pub max_bytes_per_channel: Option<u64>,
1336}
1337
1338impl Default for PresenceHistoryConfig {
1339 fn default() -> Self {
1340 Self {
1341 enabled: false,
1342 retention_window_seconds: 86400,
1343 max_page_size: 100,
1344 max_events_per_channel: None,
1345 max_bytes_per_channel: None,
1346 }
1347 }
1348}
1349
1350#[derive(Debug, Clone, Serialize, Deserialize)]
1351#[serde(default)]
1352pub struct HttpApiConfig {
1353 pub request_limit_in_mb: u32,
1354 pub accept_traffic: AcceptTraffic,
1355 pub usage_enabled: bool,
1356}
1357
1358#[derive(Debug, Clone, Serialize, Deserialize)]
1359#[serde(default)]
1360pub struct AcceptTraffic {
1361 pub memory_threshold: f64,
1362}
1363
1364#[derive(Debug, Clone, Serialize, Deserialize)]
1365#[serde(default)]
1366pub struct InstanceConfig {
1367 pub process_id: String,
1368}
1369
1370#[derive(Debug, Clone, Serialize, Deserialize)]
1371#[serde(default)]
1372pub struct MetricsConfig {
1373 pub enabled: bool,
1374 pub driver: MetricsDriver,
1375 pub host: String,
1376 pub prometheus: PrometheusConfig,
1377 pub port: u16,
1378}
1379
1380#[derive(Debug, Clone, Serialize, Deserialize)]
1381#[serde(default)]
1382pub struct PrometheusConfig {
1383 pub prefix: String,
1384}
1385
1386#[derive(Debug, Clone, Serialize, Deserialize)]
1387#[serde(default)]
1388pub struct LoggingConfig {
1389 pub colors_enabled: bool,
1390 pub include_target: bool,
1391}
1392
1393#[derive(Debug, Clone, Serialize, Deserialize)]
1394#[serde(default)]
1395pub struct PresenceConfig {
1396 pub max_members_per_channel: u32,
1397 pub max_member_size_in_kb: u32,
1398}
1399
1400#[derive(Debug, Clone, Serialize, Deserialize)]
1403#[serde(default)]
1404pub struct WebSocketConfig {
1405 pub max_messages: Option<usize>,
1406 pub max_bytes: Option<usize>,
1407 pub disconnect_on_buffer_full: bool,
1408 pub max_message_size: usize,
1409 pub max_frame_size: usize,
1410 pub write_buffer_size: usize,
1411 pub max_backpressure: usize,
1412 pub auto_ping: bool,
1413 pub ping_interval: u32,
1414 pub idle_timeout: u32,
1415 pub compression: String,
1416}
1417
1418impl Default for WebSocketConfig {
1419 fn default() -> Self {
1420 Self {
1421 max_messages: Some(1000),
1422 max_bytes: None,
1423 disconnect_on_buffer_full: true,
1424 max_message_size: 64 * 1024 * 1024,
1425 max_frame_size: 16 * 1024 * 1024,
1426 write_buffer_size: 16 * 1024,
1427 max_backpressure: 1024 * 1024,
1428 auto_ping: true,
1429 ping_interval: 30,
1430 idle_timeout: 120,
1431 compression: "disabled".to_string(),
1432 }
1433 }
1434}
1435
1436impl WebSocketConfig {
1437 pub fn to_buffer_config(&self) -> crate::websocket::WebSocketBufferConfig {
1439 use crate::websocket::{BufferLimit, WebSocketBufferConfig};
1440
1441 let limit = match (self.max_messages, self.max_bytes) {
1442 (Some(messages), Some(bytes)) => BufferLimit::Both { messages, bytes },
1443 (Some(messages), None) => BufferLimit::Messages(messages),
1444 (None, Some(bytes)) => BufferLimit::Bytes(bytes),
1445 (None, None) => BufferLimit::Messages(1000),
1446 };
1447
1448 WebSocketBufferConfig {
1449 limit,
1450 disconnect_on_full: self.disconnect_on_buffer_full,
1451 }
1452 }
1453
1454 pub fn to_sockudo_ws_config(
1456 &self,
1457 websocket_max_payload_kb: u32,
1458 activity_timeout: u64,
1459 ) -> sockudo_ws::Config {
1460 use sockudo_ws::Compression;
1461
1462 let compression = match self.compression.to_lowercase().as_str() {
1463 "dedicated" => Compression::Dedicated,
1464 "shared" => Compression::Shared,
1465 "window256b" => Compression::Window256B,
1466 "window1kb" => Compression::Window1KB,
1467 "window2kb" => Compression::Window2KB,
1468 "window4kb" => Compression::Window4KB,
1469 "window8kb" => Compression::Window8KB,
1470 "window16kb" => Compression::Window16KB,
1471 "window32kb" => Compression::Window32KB,
1472 _ => Compression::Disabled,
1473 };
1474
1475 sockudo_ws::Config::builder()
1476 .max_payload_length(
1477 self.max_bytes
1478 .unwrap_or(websocket_max_payload_kb as usize * 1024),
1479 )
1480 .max_message_size(self.max_message_size)
1481 .max_frame_size(self.max_frame_size)
1482 .write_buffer_size(self.write_buffer_size)
1483 .max_backpressure(self.max_backpressure)
1484 .idle_timeout(self.idle_timeout)
1485 .auto_ping(self.auto_ping)
1486 .ping_interval(self.ping_interval.max((activity_timeout / 2).max(5) as u32))
1487 .compression(compression)
1488 .build()
1489 }
1490}
1491
1492#[derive(Debug, Clone, Serialize, Deserialize, Default)]
1493#[serde(default)]
1494pub struct QueueConfig {
1495 pub driver: QueueDriver,
1496 pub redis: RedisQueueConfig,
1497 pub redis_cluster: RedisClusterQueueConfig,
1498 pub nats: NatsAdapterConfig,
1499 pub pulsar: PulsarAdapterConfig,
1500 pub rabbitmq: RabbitMqAdapterConfig,
1501 pub google_pubsub: GooglePubSubAdapterConfig,
1502 pub kafka: KafkaAdapterConfig,
1503 pub sqs: SqsQueueConfig,
1504 pub sns: SnsQueueConfig,
1505}
1506
1507#[derive(Debug, Clone, Serialize, Deserialize)]
1508#[serde(default)]
1509pub struct RedisQueueConfig {
1510 pub concurrency: u32,
1511 pub prefix: Option<String>,
1512 pub url_override: Option<String>,
1513 pub cluster_mode: bool,
1514}
1515
1516#[derive(Clone, Debug, Serialize, Deserialize)]
1517#[serde(default)]
1518pub struct RateLimit {
1519 pub max_requests: u32,
1520 pub window_seconds: u64,
1521 pub identifier: Option<String>,
1522 pub trust_hops: Option<u32>,
1523}
1524
1525#[derive(Debug, Clone, Serialize, Deserialize)]
1526#[serde(default)]
1527pub struct RateLimiterConfig {
1528 pub enabled: bool,
1529 pub driver: CacheDriver,
1530 pub api_rate_limit: RateLimit,
1531 pub websocket_rate_limit: RateLimit,
1532 pub redis: RedisConfig,
1533}
1534
1535#[derive(Debug, Clone, Serialize, Deserialize)]
1536#[serde(default)]
1537pub struct SslConfig {
1538 pub enabled: bool,
1539 pub cert_path: String,
1540 pub key_path: String,
1541 pub passphrase: Option<String>,
1542 pub ca_path: Option<String>,
1543 pub redirect_http: bool,
1544 pub http_port: Option<u16>,
1545}
1546
1547#[derive(Debug, Clone, Serialize, Deserialize)]
1548#[serde(default)]
1549pub struct WebhooksConfig {
1550 pub batching: BatchingConfig,
1551 pub retry: WebhookRetryConfig,
1552 pub request_timeout_ms: u64,
1553}
1554
1555#[derive(Debug, Clone, Serialize, Deserialize)]
1556#[serde(default)]
1557pub struct BatchingConfig {
1558 pub enabled: bool,
1559 pub duration: u64,
1560 pub size: usize,
1561}
1562
1563#[derive(Debug, Clone, Serialize, Deserialize)]
1564#[serde(default)]
1565pub struct WebhookRetryConfig {
1566 pub enabled: bool,
1567 pub max_attempts: Option<u32>,
1568 pub max_elapsed_time_ms: u64,
1569 pub initial_backoff_ms: u64,
1570 pub max_backoff_ms: u64,
1571}
1572
1573impl Default for WebhooksConfig {
1574 fn default() -> Self {
1575 Self {
1576 batching: BatchingConfig::default(),
1577 retry: WebhookRetryConfig::default(),
1578 request_timeout_ms: 10_000,
1579 }
1580 }
1581}
1582
1583impl Default for WebhookRetryConfig {
1584 fn default() -> Self {
1585 Self {
1586 enabled: true,
1587 max_attempts: None,
1588 max_elapsed_time_ms: 300_000,
1589 initial_backoff_ms: 1_000,
1590 max_backoff_ms: 60_000,
1591 }
1592 }
1593}
1594
1595#[derive(Debug, Clone, Serialize, Deserialize)]
1596#[serde(default)]
1597pub struct ClusterHealthConfig {
1598 pub enabled: bool,
1599 pub heartbeat_interval_ms: u64,
1600 pub node_timeout_ms: u64,
1601 pub cleanup_interval_ms: u64,
1602}
1603
1604#[derive(Debug, Clone, Serialize, Deserialize)]
1605#[serde(default)]
1606pub struct UnixSocketConfig {
1607 pub enabled: bool,
1608 pub path: String,
1609 #[serde(deserialize_with = "deserialize_octal_permission")]
1610 pub permission_mode: u32,
1611}
1612
1613#[derive(Debug, Clone, Serialize, Deserialize)]
1614#[serde(default)]
1615pub struct DeltaCompressionOptionsConfig {
1616 pub enabled: bool,
1617 pub algorithm: String,
1618 pub full_message_interval: u32,
1619 pub min_message_size: usize,
1620 pub max_state_age_secs: u64,
1621 pub max_channel_states_per_socket: usize,
1622 pub max_conflation_states_per_channel: Option<usize>,
1623 pub conflation_key_path: Option<String>,
1624 pub cluster_coordination: bool,
1625 pub coordination_backend: DeltaCoordinationBackend,
1626 pub omit_delta_algorithm: bool,
1627}
1628
1629#[derive(Debug, Clone, Serialize, Deserialize)]
1631#[serde(default)]
1632pub struct CleanupConfig {
1633 pub queue_buffer_size: usize,
1634 pub batch_size: usize,
1635 pub batch_timeout_ms: u64,
1636 pub worker_threads: WorkerThreadsConfig,
1637 pub max_retry_attempts: u32,
1638 pub async_enabled: bool,
1639 pub fallback_to_sync: bool,
1640}
1641
1642#[derive(Debug, Clone)]
1644pub enum WorkerThreadsConfig {
1645 Auto,
1646 Fixed(usize),
1647}
1648
1649impl serde::Serialize for WorkerThreadsConfig {
1650 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
1651 where
1652 S: serde::Serializer,
1653 {
1654 match self {
1655 WorkerThreadsConfig::Auto => serializer.serialize_str("auto"),
1656 WorkerThreadsConfig::Fixed(n) => serializer.serialize_u64(*n as u64),
1657 }
1658 }
1659}
1660
1661impl<'de> serde::Deserialize<'de> for WorkerThreadsConfig {
1662 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
1663 where
1664 D: serde::Deserializer<'de>,
1665 {
1666 use serde::de;
1667 struct WorkerThreadsVisitor;
1668 impl<'de> de::Visitor<'de> for WorkerThreadsVisitor {
1669 type Value = WorkerThreadsConfig;
1670
1671 fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
1672 formatter.write_str(r#""auto" or a positive integer"#)
1673 }
1674
1675 fn visit_str<E: de::Error>(self, value: &str) -> Result<Self::Value, E> {
1676 if value.eq_ignore_ascii_case("auto") {
1677 Ok(WorkerThreadsConfig::Auto)
1678 } else if let Ok(n) = value.parse::<usize>() {
1679 Ok(WorkerThreadsConfig::Fixed(n))
1680 } else {
1681 Err(E::custom(format!(
1682 "expected 'auto' or a number, got '{value}'"
1683 )))
1684 }
1685 }
1686
1687 fn visit_u64<E: de::Error>(self, value: u64) -> Result<Self::Value, E> {
1688 Ok(WorkerThreadsConfig::Fixed(value as usize))
1689 }
1690
1691 fn visit_i64<E: de::Error>(self, value: i64) -> Result<Self::Value, E> {
1692 if value >= 0 {
1693 Ok(WorkerThreadsConfig::Fixed(value as usize))
1694 } else {
1695 Err(E::custom("worker_threads must be non-negative"))
1696 }
1697 }
1698 }
1699 deserializer.deserialize_any(WorkerThreadsVisitor)
1700 }
1701}
1702
1703impl Default for CleanupConfig {
1704 fn default() -> Self {
1705 Self {
1706 queue_buffer_size: 1024,
1707 batch_size: 64,
1708 batch_timeout_ms: 100,
1709 worker_threads: WorkerThreadsConfig::Auto,
1710 max_retry_attempts: 3,
1711 async_enabled: true,
1712 fallback_to_sync: true,
1713 }
1714 }
1715}
1716
1717impl CleanupConfig {
1718 pub fn validate(&self) -> Result<(), String> {
1719 if self.queue_buffer_size == 0 {
1720 return Err("queue_buffer_size must be greater than 0".to_string());
1721 }
1722 if self.batch_size == 0 {
1723 return Err("batch_size must be greater than 0".to_string());
1724 }
1725 if self.batch_timeout_ms == 0 {
1726 return Err("batch_timeout_ms must be greater than 0".to_string());
1727 }
1728 if let WorkerThreadsConfig::Fixed(n) = self.worker_threads
1729 && n == 0
1730 {
1731 return Err("worker_threads must be greater than 0 when using fixed count".to_string());
1732 }
1733 Ok(())
1734 }
1735}
1736
1737#[derive(Debug, Clone, Serialize, Deserialize, Default)]
1738#[serde(default)]
1739pub struct TagFilteringConfig {
1740 #[serde(default)]
1741 pub enabled: bool,
1742 #[serde(default = "default_true")]
1743 pub enable_tags: bool,
1744}
1745
1746fn default_true() -> bool {
1747 true
1748}
1749
1750impl Default for ServerOptions {
1753 fn default() -> Self {
1754 Self {
1755 adapter: AdapterConfig::default(),
1756 app_manager: AppManagerConfig::default(),
1757 cache: CacheConfig::default(),
1758 channel_limits: ChannelLimits::default(),
1759 cors: CorsConfig::default(),
1760 database: DatabaseConfig::default(),
1761 database_pooling: DatabasePooling::default(),
1762 debug: false,
1763 tag_filtering: TagFilteringConfig::default(),
1764 event_limits: EventLimits::default(),
1765 host: "0.0.0.0".to_string(),
1766 http_api: HttpApiConfig::default(),
1767 instance: InstanceConfig::default(),
1768 logging: None,
1769 metrics: MetricsConfig::default(),
1770 mode: "production".to_string(),
1771 port: 6001,
1772 path_prefix: "/".to_string(),
1773 presence: PresenceConfig::default(),
1774 queue: QueueConfig::default(),
1775 rate_limiter: RateLimiterConfig::default(),
1776 shutdown_grace_period: 10,
1777 ssl: SslConfig::default(),
1778 user_authentication_timeout: 3600,
1779 webhooks: WebhooksConfig::default(),
1780 websocket_max_payload_kb: 64,
1781 cleanup: CleanupConfig::default(),
1782 activity_timeout: 120,
1783 cluster_health: ClusterHealthConfig::default(),
1784 unix_socket: UnixSocketConfig::default(),
1785 delta_compression: DeltaCompressionOptionsConfig::default(),
1786 websocket: WebSocketConfig::default(),
1787 connection_recovery: ConnectionRecoveryConfig::default(),
1788 history: HistoryConfig::default(),
1789 presence_history: PresenceHistoryConfig::default(),
1790 idempotency: IdempotencyConfig::default(),
1791 ephemeral: EphemeralConfig::default(),
1792 echo_control: EchoControlConfig::default(),
1793 event_name_filtering: EventNameFilteringConfig::default(),
1794 versioned_messages: VersionedMessagesConfig::default(),
1795 }
1796 }
1797}
1798
1799impl Default for SqsQueueConfig {
1800 fn default() -> Self {
1801 Self {
1802 region: "us-east-1".to_string(),
1803 queue_url_prefix: None,
1804 visibility_timeout: 30,
1805 endpoint_url: None,
1806 max_messages: 10,
1807 wait_time_seconds: 5,
1808 concurrency: 5,
1809 fifo: false,
1810 message_group_id: Some("default".to_string()),
1811 }
1812 }
1813}
1814
1815impl Default for SnsQueueConfig {
1816 fn default() -> Self {
1817 Self {
1818 region: "us-east-1".to_string(),
1819 topic_arn: String::new(),
1820 endpoint_url: None,
1821 }
1822 }
1823}
1824
1825impl Default for RedisAdapterConfig {
1826 fn default() -> Self {
1827 Self {
1828 requests_timeout: 5000,
1829 prefix: "sockudo_adapter:".to_string(),
1830 redis_pub_options: AHashMap::new(),
1831 redis_sub_options: AHashMap::new(),
1832 cluster_mode: false,
1833 }
1834 }
1835}
1836
1837impl Default for RedisClusterAdapterConfig {
1838 fn default() -> Self {
1839 Self {
1840 nodes: vec![],
1841 prefix: "sockudo_adapter:".to_string(),
1842 request_timeout_ms: 1000,
1843 use_connection_manager: true,
1844 use_sharded_pubsub: false,
1845 }
1846 }
1847}
1848
1849impl Default for NatsAdapterConfig {
1850 fn default() -> Self {
1851 Self {
1852 servers: vec!["nats://localhost:4222".to_string()],
1853 prefix: "sockudo_adapter:".to_string(),
1854 request_timeout_ms: 5000,
1855 username: None,
1856 password: None,
1857 token: None,
1858 connection_timeout_ms: 5000,
1859 nodes_number: None,
1860 discovery_max_wait_ms: 1000,
1861 discovery_idle_wait_ms: 150,
1862 }
1863 }
1864}
1865
1866impl Default for PulsarAdapterConfig {
1867 fn default() -> Self {
1868 Self {
1869 url: "pulsar://127.0.0.1:6650".to_string(),
1870 prefix: "sockudo-adapter".to_string(),
1871 request_timeout_ms: 5000,
1872 token: None,
1873 nodes_number: None,
1874 }
1875 }
1876}
1877
1878impl Default for RabbitMqAdapterConfig {
1879 fn default() -> Self {
1880 Self {
1881 url: "amqp://guest:guest@127.0.0.1:5672/%2f".to_string(),
1882 prefix: "sockudo_adapter".to_string(),
1883 request_timeout_ms: 5000,
1884 connection_timeout_ms: 5000,
1885 nodes_number: None,
1886 }
1887 }
1888}
1889
1890impl Default for GooglePubSubAdapterConfig {
1891 fn default() -> Self {
1892 Self {
1893 project_id: "".to_string(),
1894 prefix: "sockudo-adapter".to_string(),
1895 request_timeout_ms: 5000,
1896 emulator_host: None,
1897 nodes_number: None,
1898 }
1899 }
1900}
1901
1902impl Default for KafkaAdapterConfig {
1903 fn default() -> Self {
1904 Self {
1905 brokers: vec!["localhost:9092".to_string()],
1906 prefix: "sockudo_adapter".to_string(),
1907 request_timeout_ms: 5000,
1908 security_protocol: None,
1909 sasl_mechanism: None,
1910 sasl_username: None,
1911 sasl_password: None,
1912 nodes_number: None,
1913 }
1914 }
1915}
1916
1917impl Default for CacheSettings {
1918 fn default() -> Self {
1919 Self {
1920 enabled: true,
1921 ttl: 300,
1922 }
1923 }
1924}
1925
1926impl Default for MemoryCacheOptions {
1927 fn default() -> Self {
1928 Self {
1929 ttl: 300,
1930 cleanup_interval: 60,
1931 max_capacity: 10000,
1932 }
1933 }
1934}
1935
1936impl Default for CacheConfig {
1937 fn default() -> Self {
1938 Self {
1939 driver: CacheDriver::default(),
1940 redis: RedisConfig {
1941 prefix: Some("sockudo_cache:".to_string()),
1942 url_override: None,
1943 cluster_mode: false,
1944 },
1945 memory: MemoryCacheOptions::default(),
1946 }
1947 }
1948}
1949
1950impl Default for ChannelLimits {
1951 fn default() -> Self {
1952 Self {
1953 max_name_length: 200,
1954 cache_ttl: 3600,
1955 }
1956 }
1957}
1958impl Default for CorsConfig {
1959 fn default() -> Self {
1960 Self {
1961 credentials: true,
1962 origin: vec!["*".to_string()],
1963 methods: vec!["GET".to_string(), "POST".to_string(), "OPTIONS".to_string()],
1964 allowed_headers: vec![
1965 "Authorization".to_string(),
1966 "Content-Type".to_string(),
1967 "X-Requested-With".to_string(),
1968 "Accept".to_string(),
1969 ],
1970 }
1971 }
1972}
1973
1974impl Default for DatabaseConnection {
1975 fn default() -> Self {
1976 Self {
1977 host: "localhost".to_string(),
1978 port: 3306,
1979 username: "root".to_string(),
1980 password: "".to_string(),
1981 database: "sockudo".to_string(),
1982 table_name: "applications".to_string(),
1983 connection_pool_size: 10,
1984 pool_min: None,
1985 pool_max: None,
1986 cache_ttl: 300,
1987 cache_cleanup_interval: 60,
1988 cache_max_capacity: 100,
1989 }
1990 }
1991}
1992
1993impl Default for RedisConnection {
1994 fn default() -> Self {
1995 Self {
1996 host: "127.0.0.1".to_string(),
1997 port: 6379,
1998 db: 0,
1999 username: None,
2000 password: None,
2001 key_prefix: "sockudo:".to_string(),
2002 sentinels: Vec::new(),
2003 sentinel_password: None,
2004 name: "mymaster".to_string(),
2005 cluster: RedisClusterConnection::default(),
2006 cluster_nodes: Vec::new(),
2007 }
2008 }
2009}
2010
2011impl Default for RedisSentinel {
2012 fn default() -> Self {
2013 Self {
2014 host: "localhost".to_string(),
2015 port: 26379,
2016 }
2017 }
2018}
2019
2020impl Default for ClusterNode {
2021 fn default() -> Self {
2022 Self {
2023 host: "127.0.0.1".to_string(),
2024 port: 7000,
2025 }
2026 }
2027}
2028
2029impl Default for DatabasePooling {
2030 fn default() -> Self {
2031 Self {
2032 enabled: true,
2033 min: 2,
2034 max: 10,
2035 }
2036 }
2037}
2038
2039impl Default for EventLimits {
2040 fn default() -> Self {
2041 Self {
2042 max_channels_at_once: 100,
2043 max_name_length: 200,
2044 max_payload_in_kb: 100,
2045 max_batch_size: 10,
2046 }
2047 }
2048}
2049
2050impl Default for IdempotencyConfig {
2051 fn default() -> Self {
2052 Self {
2053 enabled: true,
2054 ttl_seconds: 120,
2055 max_key_length: 128,
2056 }
2057 }
2058}
2059
2060impl Default for EphemeralConfig {
2061 fn default() -> Self {
2062 Self { enabled: true }
2063 }
2064}
2065
2066impl Default for EchoControlConfig {
2067 fn default() -> Self {
2068 Self {
2069 enabled: true,
2070 default_echo_messages: true,
2071 }
2072 }
2073}
2074
2075impl Default for EventNameFilteringConfig {
2076 fn default() -> Self {
2077 Self {
2078 enabled: true,
2079 max_events_per_filter: 50,
2080 max_event_name_length: 200,
2081 }
2082 }
2083}
2084
2085impl Default for ConnectionRecoveryConfig {
2086 fn default() -> Self {
2087 Self {
2088 enabled: false,
2089 buffer_ttl_seconds: 120,
2090 max_buffer_size: 100,
2091 }
2092 }
2093}
2094
2095impl Default for HttpApiConfig {
2096 fn default() -> Self {
2097 Self {
2098 request_limit_in_mb: 10,
2099 accept_traffic: AcceptTraffic::default(),
2100 usage_enabled: true,
2101 }
2102 }
2103}
2104
2105impl Default for AcceptTraffic {
2106 fn default() -> Self {
2107 Self {
2108 memory_threshold: 0.90,
2109 }
2110 }
2111}
2112
2113impl Default for InstanceConfig {
2114 fn default() -> Self {
2115 Self {
2116 process_id: uuid::Uuid::new_v4().to_string(),
2117 }
2118 }
2119}
2120
2121impl Default for LoggingConfig {
2122 fn default() -> Self {
2123 Self {
2124 colors_enabled: true,
2125 include_target: true,
2126 }
2127 }
2128}
2129
2130impl Default for MetricsConfig {
2131 fn default() -> Self {
2132 Self {
2133 enabled: true,
2134 driver: MetricsDriver::default(),
2135 host: "0.0.0.0".to_string(),
2136 prometheus: PrometheusConfig::default(),
2137 port: 9601,
2138 }
2139 }
2140}
2141
2142impl Default for PrometheusConfig {
2143 fn default() -> Self {
2144 Self {
2145 prefix: "sockudo_".to_string(),
2146 }
2147 }
2148}
2149
2150impl Default for PresenceConfig {
2151 fn default() -> Self {
2152 Self {
2153 max_members_per_channel: 100,
2154 max_member_size_in_kb: 2,
2155 }
2156 }
2157}
2158
2159impl Default for RedisQueueConfig {
2160 fn default() -> Self {
2161 Self {
2162 concurrency: 5,
2163 prefix: Some("sockudo_queue:".to_string()),
2164 url_override: None,
2165 cluster_mode: false,
2166 }
2167 }
2168}
2169
2170impl Default for RateLimit {
2171 fn default() -> Self {
2172 Self {
2173 max_requests: 60,
2174 window_seconds: 60,
2175 identifier: Some("default".to_string()),
2176 trust_hops: Some(0),
2177 }
2178 }
2179}
2180
2181impl Default for RateLimiterConfig {
2182 fn default() -> Self {
2183 Self {
2184 enabled: true,
2185 driver: CacheDriver::Memory,
2186 api_rate_limit: RateLimit {
2187 max_requests: 100,
2188 window_seconds: 60,
2189 identifier: Some("api".to_string()),
2190 trust_hops: Some(0),
2191 },
2192 websocket_rate_limit: RateLimit {
2193 max_requests: 20,
2194 window_seconds: 60,
2195 identifier: Some("websocket_connect".to_string()),
2196 trust_hops: Some(0),
2197 },
2198 redis: RedisConfig {
2199 prefix: Some("sockudo_rl:".to_string()),
2200 url_override: None,
2201 cluster_mode: false,
2202 },
2203 }
2204 }
2205}
2206
2207impl Default for SslConfig {
2208 fn default() -> Self {
2209 Self {
2210 enabled: false,
2211 cert_path: "".to_string(),
2212 key_path: "".to_string(),
2213 passphrase: None,
2214 ca_path: None,
2215 redirect_http: false,
2216 http_port: Some(80),
2217 }
2218 }
2219}
2220
2221impl Default for BatchingConfig {
2222 fn default() -> Self {
2223 Self {
2224 enabled: true,
2225 duration: 50,
2226 size: 100,
2227 }
2228 }
2229}
2230
2231impl Default for ClusterHealthConfig {
2232 fn default() -> Self {
2233 Self {
2234 enabled: true,
2235 heartbeat_interval_ms: 10000,
2236 node_timeout_ms: 30000,
2237 cleanup_interval_ms: 10000,
2238 }
2239 }
2240}
2241
2242impl Default for UnixSocketConfig {
2243 fn default() -> Self {
2244 Self {
2245 enabled: false,
2246 path: "/var/run/sockudo/sockudo.sock".to_string(),
2247 permission_mode: 0o660,
2248 }
2249 }
2250}
2251
2252impl Default for DeltaCompressionOptionsConfig {
2253 fn default() -> Self {
2254 Self {
2255 enabled: true,
2256 algorithm: "fossil".to_string(),
2257 full_message_interval: 10,
2258 min_message_size: 100,
2259 max_state_age_secs: 300,
2260 max_channel_states_per_socket: 100,
2261 max_conflation_states_per_channel: Some(100),
2262 conflation_key_path: None,
2263 cluster_coordination: false,
2264 coordination_backend: DeltaCoordinationBackend::Auto,
2265 omit_delta_algorithm: false,
2266 }
2267 }
2268}
2269
2270#[cfg(test)]
2271mod tests {
2272 use super::{DeltaCoordinationBackend, QueueDriver, ServerOptions, VersionStoreDriver};
2273 use std::str::FromStr;
2274
2275 #[test]
2276 fn queue_driver_parses_broker_backends() {
2277 assert_eq!(
2278 QueueDriver::from_str("rabbitmq").unwrap(),
2279 QueueDriver::RabbitMq
2280 );
2281 assert_eq!(QueueDriver::from_str("kafka").unwrap(), QueueDriver::Kafka);
2282 assert_eq!(
2283 QueueDriver::from_str("pulsar").unwrap(),
2284 QueueDriver::Pulsar
2285 );
2286 assert_eq!(
2287 QueueDriver::from_str("google-pubsub").unwrap(),
2288 QueueDriver::GooglePubSub
2289 );
2290 }
2291
2292 #[test]
2293 fn delta_coordination_backend_parses_expected_values() {
2294 assert_eq!(
2295 DeltaCoordinationBackend::from_str("auto").unwrap(),
2296 DeltaCoordinationBackend::Auto
2297 );
2298 assert_eq!(
2299 DeltaCoordinationBackend::from_str("redis-cluster").unwrap(),
2300 DeltaCoordinationBackend::RedisCluster
2301 );
2302 assert_eq!(
2303 DeltaCoordinationBackend::from_str("nats").unwrap(),
2304 DeltaCoordinationBackend::Nats
2305 );
2306 }
2307
2308 #[tokio::test]
2309 async fn versioned_messages_driver_overrides_from_env() {
2310 let previous = std::env::var("VERSIONED_MESSAGES_DRIVER").ok();
2311 unsafe { std::env::set_var("VERSIONED_MESSAGES_DRIVER", "postgres") };
2314
2315 let mut options = ServerOptions::default();
2316 options.override_from_env().await.unwrap();
2317
2318 if let Some(previous) = previous {
2319 unsafe { std::env::set_var("VERSIONED_MESSAGES_DRIVER", previous) };
2321 } else {
2322 unsafe { std::env::remove_var("VERSIONED_MESSAGES_DRIVER") };
2324 }
2325
2326 assert_eq!(
2327 options.versioned_messages.driver,
2328 VersionStoreDriver::Postgres
2329 );
2330 }
2331}
2332
2333impl ClusterHealthConfig {
2334 pub fn validate(&self) -> Result<(), String> {
2335 if self.heartbeat_interval_ms == 0 {
2336 return Err("heartbeat_interval_ms must be greater than 0".to_string());
2337 }
2338 if self.node_timeout_ms == 0 {
2339 return Err("node_timeout_ms must be greater than 0".to_string());
2340 }
2341 if self.cleanup_interval_ms == 0 {
2342 return Err("cleanup_interval_ms must be greater than 0".to_string());
2343 }
2344
2345 if self.heartbeat_interval_ms > self.node_timeout_ms / 3 {
2346 return Err(format!(
2347 "heartbeat_interval_ms ({}) should be at least 3x smaller than node_timeout_ms ({}) to avoid false positive dead node detection. Recommended: heartbeat_interval_ms <= {}",
2348 self.heartbeat_interval_ms,
2349 self.node_timeout_ms,
2350 self.node_timeout_ms / 3
2351 ));
2352 }
2353
2354 if self.cleanup_interval_ms > self.node_timeout_ms {
2355 return Err(format!(
2356 "cleanup_interval_ms ({}) should not be larger than node_timeout_ms ({}) to ensure timely dead node detection",
2357 self.cleanup_interval_ms, self.node_timeout_ms
2358 ));
2359 }
2360
2361 Ok(())
2362 }
2363}
2364
2365impl ServerOptions {
2366 pub async fn load_from_file(path: &str) -> Result<Self, Box<dyn std::error::Error>> {
2367 let content = tokio::fs::read_to_string(path).await?;
2368 let options: Self = if path.ends_with(".toml") {
2369 toml::from_str(&content)?
2370 } else {
2371 sonic_rs::from_str(&content)?
2373 };
2374 Ok(options)
2375 }
2376
2377 pub async fn override_from_env(&mut self) -> Result<(), Box<dyn std::error::Error>> {
2378 if let Ok(mode) = std::env::var("ENVIRONMENT") {
2380 self.mode = mode;
2381 }
2382 self.debug = parse_bool_env("DEBUG_MODE", self.debug);
2383 if parse_bool_env("DEBUG", false) {
2384 self.debug = true;
2385 info!("DEBUG environment variable forces debug mode ON");
2386 }
2387
2388 self.activity_timeout = parse_env::<u64>("ACTIVITY_TIMEOUT", self.activity_timeout);
2389
2390 if let Ok(host) = std::env::var("HOST") {
2391 self.host = host;
2392 }
2393 self.port = parse_env::<u16>("PORT", self.port);
2394 self.shutdown_grace_period =
2395 parse_env::<u64>("SHUTDOWN_GRACE_PERIOD", self.shutdown_grace_period);
2396 self.user_authentication_timeout = parse_env::<u64>(
2397 "USER_AUTHENTICATION_TIMEOUT",
2398 self.user_authentication_timeout,
2399 );
2400 self.websocket_max_payload_kb =
2401 parse_env::<u32>("WEBSOCKET_MAX_PAYLOAD_KB", self.websocket_max_payload_kb);
2402 if let Ok(id) = std::env::var("INSTANCE_PROCESS_ID") {
2403 self.instance.process_id = id;
2404 }
2405
2406 if let Ok(driver_str) = std::env::var("ADAPTER_DRIVER") {
2408 self.adapter.driver =
2409 parse_driver_enum(driver_str, self.adapter.driver.clone(), "Adapter");
2410 }
2411 self.adapter.buffer_multiplier_per_cpu = parse_env::<usize>(
2412 "ADAPTER_BUFFER_MULTIPLIER_PER_CPU",
2413 self.adapter.buffer_multiplier_per_cpu,
2414 );
2415 self.adapter.enable_socket_counting = parse_env::<bool>(
2416 "ADAPTER_ENABLE_SOCKET_COUNTING",
2417 self.adapter.enable_socket_counting,
2418 );
2419 self.adapter.fallback_to_local =
2420 parse_env::<bool>("ADAPTER_FALLBACK_TO_LOCAL", self.adapter.fallback_to_local);
2421 if let Ok(driver_str) = std::env::var("CACHE_DRIVER") {
2422 self.cache.driver = parse_driver_enum(driver_str, self.cache.driver.clone(), "Cache");
2423 }
2424 if let Ok(driver_str) = std::env::var("QUEUE_DRIVER") {
2425 self.queue.driver = parse_driver_enum(driver_str, self.queue.driver.clone(), "Queue");
2426 }
2427 if let Ok(driver_str) = std::env::var("APP_MANAGER_DRIVER") {
2428 self.app_manager.driver =
2429 parse_driver_enum(driver_str, self.app_manager.driver.clone(), "AppManager");
2430 }
2431 if let Ok(driver_str) = std::env::var("RATE_LIMITER_DRIVER") {
2432 self.rate_limiter.driver = parse_driver_enum(
2433 driver_str,
2434 self.rate_limiter.driver.clone(),
2435 "RateLimiter Backend",
2436 );
2437 }
2438
2439 if let Ok(host) = std::env::var("DATABASE_REDIS_HOST") {
2441 self.database.redis.host = host;
2442 }
2443 self.database.redis.port =
2444 parse_env::<u16>("DATABASE_REDIS_PORT", self.database.redis.port);
2445 if let Ok(username) = std::env::var("DATABASE_REDIS_USERNAME") {
2446 self.database.redis.username = if username.is_empty() {
2447 None
2448 } else {
2449 Some(username)
2450 };
2451 }
2452 if let Ok(password) = std::env::var("DATABASE_REDIS_PASSWORD") {
2453 self.database.redis.password = Some(password);
2454 }
2455 self.database.redis.db = parse_env::<u32>("DATABASE_REDIS_DB", self.database.redis.db);
2456 if let Ok(prefix) = std::env::var("DATABASE_REDIS_KEY_PREFIX") {
2457 self.database.redis.key_prefix = prefix;
2458 }
2459 if let Ok(cluster_username) = std::env::var("DATABASE_REDIS_CLUSTER_USERNAME") {
2460 self.database.redis.cluster.username = if cluster_username.is_empty() {
2461 None
2462 } else {
2463 Some(cluster_username)
2464 };
2465 }
2466 if let Ok(cluster_password) = std::env::var("DATABASE_REDIS_CLUSTER_PASSWORD") {
2467 self.database.redis.cluster.password = Some(cluster_password);
2468 }
2469 self.database.redis.cluster.use_tls = parse_bool_env(
2470 "DATABASE_REDIS_CLUSTER_USE_TLS",
2471 self.database.redis.cluster.use_tls,
2472 );
2473
2474 if let Ok(host) = std::env::var("DATABASE_MYSQL_HOST") {
2476 self.database.mysql.host = host;
2477 }
2478 self.database.mysql.port =
2479 parse_env::<u16>("DATABASE_MYSQL_PORT", self.database.mysql.port);
2480 if let Ok(user) = std::env::var("DATABASE_MYSQL_USERNAME") {
2481 self.database.mysql.username = user;
2482 }
2483 if let Ok(pass) = std::env::var("DATABASE_MYSQL_PASSWORD") {
2484 self.database.mysql.password = pass;
2485 }
2486 if let Ok(db) = std::env::var("DATABASE_MYSQL_DATABASE") {
2487 self.database.mysql.database = db;
2488 }
2489 if let Ok(table) = std::env::var("DATABASE_MYSQL_TABLE_NAME") {
2490 self.database.mysql.table_name = table;
2491 }
2492 override_db_pool_settings(&mut self.database.mysql, "DATABASE_MYSQL");
2493
2494 if let Ok(host) = std::env::var("DATABASE_POSTGRES_HOST") {
2496 self.database.postgres.host = host;
2497 }
2498 self.database.postgres.port =
2499 parse_env::<u16>("DATABASE_POSTGRES_PORT", self.database.postgres.port);
2500 if let Ok(user) = std::env::var("DATABASE_POSTGRES_USERNAME") {
2501 self.database.postgres.username = user;
2502 }
2503 if let Ok(pass) = std::env::var("DATABASE_POSTGRES_PASSWORD") {
2504 self.database.postgres.password = pass;
2505 }
2506 if let Ok(db) = std::env::var("DATABASE_POSTGRES_DATABASE") {
2507 self.database.postgres.database = db;
2508 }
2509 override_db_pool_settings(&mut self.database.postgres, "DATABASE_POSTGRES");
2510
2511 if let Ok(region) = std::env::var("DATABASE_DYNAMODB_REGION") {
2513 self.database.dynamodb.region = region;
2514 }
2515 if let Ok(table) = std::env::var("DATABASE_DYNAMODB_TABLE_NAME") {
2516 self.database.dynamodb.table_name = table;
2517 }
2518 if let Ok(endpoint) = std::env::var("DATABASE_DYNAMODB_ENDPOINT_URL") {
2519 self.database.dynamodb.endpoint_url = Some(endpoint);
2520 }
2521 if let Ok(key_id) = std::env::var("AWS_ACCESS_KEY_ID") {
2522 self.database.dynamodb.aws_access_key_id = Some(key_id);
2523 }
2524 if let Ok(secret) = std::env::var("AWS_SECRET_ACCESS_KEY") {
2525 self.database.dynamodb.aws_secret_access_key = Some(secret);
2526 }
2527
2528 if let Ok(url) = std::env::var("DATABASE_SURREALDB_URL") {
2530 self.database.surrealdb.url = url;
2531 }
2532 if let Ok(namespace) = std::env::var("DATABASE_SURREALDB_NAMESPACE") {
2533 self.database.surrealdb.namespace = namespace;
2534 }
2535 if let Ok(database) = std::env::var("DATABASE_SURREALDB_DATABASE") {
2536 self.database.surrealdb.database = database;
2537 }
2538 if let Ok(username) = std::env::var("DATABASE_SURREALDB_USERNAME") {
2539 self.database.surrealdb.username = username;
2540 }
2541 if let Ok(password) = std::env::var("DATABASE_SURREALDB_PASSWORD") {
2542 self.database.surrealdb.password = password;
2543 }
2544 if let Ok(table) = std::env::var("DATABASE_SURREALDB_TABLE_NAME") {
2545 self.database.surrealdb.table_name = table;
2546 }
2547
2548 let apply_redis_cluster_nodes = |options: &mut Self, nodes: &str| {
2550 let node_list: Vec<String> = nodes
2551 .split(',')
2552 .map(|s| s.trim())
2553 .filter(|s| !s.is_empty())
2554 .map(ToString::to_string)
2555 .collect();
2556
2557 options.adapter.cluster.nodes = node_list.clone();
2558 options.queue.redis_cluster.nodes = node_list.clone();
2559
2560 let parsed_nodes: Vec<ClusterNode> = node_list
2561 .iter()
2562 .filter_map(|seed| ClusterNode::from_seed(seed))
2563 .collect();
2564 options.database.redis.cluster.nodes = parsed_nodes.clone();
2565 options.database.redis.cluster_nodes = parsed_nodes;
2566 };
2567
2568 if let Ok(nodes) = std::env::var("REDIS_CLUSTER_NODES") {
2569 apply_redis_cluster_nodes(self, &nodes);
2570 }
2571 if let Ok(nodes) = std::env::var("DATABASE_REDIS_CLUSTER_NODES") {
2572 apply_redis_cluster_nodes(self, &nodes);
2573 }
2574 self.queue.redis_cluster.concurrency = parse_env::<u32>(
2575 "REDIS_CLUSTER_QUEUE_CONCURRENCY",
2576 self.queue.redis_cluster.concurrency,
2577 );
2578 if let Ok(prefix) = std::env::var("REDIS_CLUSTER_QUEUE_PREFIX") {
2579 self.queue.redis_cluster.prefix = Some(prefix);
2580 }
2581
2582 self.ssl.enabled = parse_bool_env("SSL_ENABLED", self.ssl.enabled);
2584 if let Ok(val) = std::env::var("SSL_CERT_PATH") {
2585 self.ssl.cert_path = val;
2586 }
2587 if let Ok(val) = std::env::var("SSL_KEY_PATH") {
2588 self.ssl.key_path = val;
2589 }
2590 self.ssl.redirect_http = parse_bool_env("SSL_REDIRECT_HTTP", self.ssl.redirect_http);
2591 if let Some(port) = parse_env_optional::<u16>("SSL_HTTP_PORT") {
2592 self.ssl.http_port = Some(port);
2593 }
2594
2595 self.unix_socket.enabled = parse_bool_env("UNIX_SOCKET_ENABLED", self.unix_socket.enabled);
2597 if let Ok(path) = std::env::var("UNIX_SOCKET_PATH") {
2598 self.unix_socket.path = path;
2599 }
2600 if let Ok(mode_str) = std::env::var("UNIX_SOCKET_PERMISSION_MODE") {
2601 if mode_str.chars().all(|c| c.is_digit(8)) {
2602 if let Ok(mode) = u32::from_str_radix(&mode_str, 8) {
2603 if mode <= 0o777 {
2604 self.unix_socket.permission_mode = mode;
2605 } else {
2606 warn!(
2607 "UNIX_SOCKET_PERMISSION_MODE '{}' exceeds maximum value 777. Using default: {:o}",
2608 mode_str, self.unix_socket.permission_mode
2609 );
2610 }
2611 } else {
2612 warn!(
2613 "Failed to parse UNIX_SOCKET_PERMISSION_MODE '{}' as octal. Using default: {:o}",
2614 mode_str, self.unix_socket.permission_mode
2615 );
2616 }
2617 } else {
2618 warn!(
2619 "UNIX_SOCKET_PERMISSION_MODE '{}' must contain only octal digits (0-7). Using default: {:o}",
2620 mode_str, self.unix_socket.permission_mode
2621 );
2622 }
2623 }
2624
2625 if let Ok(driver_str) = std::env::var("METRICS_DRIVER") {
2627 self.metrics.driver =
2628 parse_driver_enum(driver_str, self.metrics.driver.clone(), "Metrics");
2629 }
2630 self.metrics.enabled = parse_bool_env("METRICS_ENABLED", self.metrics.enabled);
2631 if let Ok(val) = std::env::var("METRICS_HOST") {
2632 self.metrics.host = val;
2633 }
2634 self.metrics.port = parse_env::<u16>("METRICS_PORT", self.metrics.port);
2635 if let Ok(val) = std::env::var("METRICS_PROMETHEUS_PREFIX") {
2636 self.metrics.prometheus.prefix = val;
2637 }
2638
2639 self.http_api.usage_enabled =
2641 parse_bool_env("HTTP_API_USAGE_ENABLED", self.http_api.usage_enabled);
2642
2643 self.rate_limiter.enabled =
2645 parse_bool_env("RATE_LIMITER_ENABLED", self.rate_limiter.enabled);
2646 self.rate_limiter.api_rate_limit.max_requests = parse_env::<u32>(
2647 "RATE_LIMITER_API_MAX_REQUESTS",
2648 self.rate_limiter.api_rate_limit.max_requests,
2649 );
2650 self.rate_limiter.api_rate_limit.window_seconds = parse_env::<u64>(
2651 "RATE_LIMITER_API_WINDOW_SECONDS",
2652 self.rate_limiter.api_rate_limit.window_seconds,
2653 );
2654 if let Some(hops) = parse_env_optional::<u32>("RATE_LIMITER_API_TRUST_HOPS") {
2655 self.rate_limiter.api_rate_limit.trust_hops = Some(hops);
2656 }
2657 self.rate_limiter.websocket_rate_limit.max_requests = parse_env::<u32>(
2658 "RATE_LIMITER_WS_MAX_REQUESTS",
2659 self.rate_limiter.websocket_rate_limit.max_requests,
2660 );
2661 self.rate_limiter.websocket_rate_limit.window_seconds = parse_env::<u64>(
2662 "RATE_LIMITER_WS_WINDOW_SECONDS",
2663 self.rate_limiter.websocket_rate_limit.window_seconds,
2664 );
2665 if let Ok(prefix) = std::env::var("RATE_LIMITER_REDIS_PREFIX") {
2666 self.rate_limiter.redis.prefix = Some(prefix);
2667 }
2668
2669 self.queue.redis.concurrency =
2671 parse_env::<u32>("QUEUE_REDIS_CONCURRENCY", self.queue.redis.concurrency);
2672 if let Ok(prefix) = std::env::var("QUEUE_REDIS_PREFIX") {
2673 self.queue.redis.prefix = Some(prefix);
2674 }
2675
2676 if let Ok(region) = std::env::var("QUEUE_SQS_REGION") {
2678 self.queue.sqs.region = region;
2679 }
2680 self.queue.sqs.visibility_timeout = parse_env::<i32>(
2681 "QUEUE_SQS_VISIBILITY_TIMEOUT",
2682 self.queue.sqs.visibility_timeout,
2683 );
2684 self.queue.sqs.max_messages =
2685 parse_env::<i32>("QUEUE_SQS_MAX_MESSAGES", self.queue.sqs.max_messages);
2686 self.queue.sqs.wait_time_seconds = parse_env::<i32>(
2687 "QUEUE_SQS_WAIT_TIME_SECONDS",
2688 self.queue.sqs.wait_time_seconds,
2689 );
2690 self.queue.sqs.concurrency =
2691 parse_env::<u32>("QUEUE_SQS_CONCURRENCY", self.queue.sqs.concurrency);
2692 self.queue.sqs.fifo = parse_bool_env("QUEUE_SQS_FIFO", self.queue.sqs.fifo);
2693 if let Ok(endpoint) = std::env::var("QUEUE_SQS_ENDPOINT_URL") {
2694 self.queue.sqs.endpoint_url = Some(endpoint);
2695 }
2696
2697 if let Ok(region) = std::env::var("QUEUE_SNS_REGION") {
2699 self.queue.sns.region = region;
2700 }
2701 if let Ok(topic_arn) = std::env::var("QUEUE_SNS_TOPIC_ARN") {
2702 self.queue.sns.topic_arn = topic_arn;
2703 }
2704 if let Ok(endpoint) = std::env::var("QUEUE_SNS_ENDPOINT_URL") {
2705 self.queue.sns.endpoint_url = Some(endpoint);
2706 }
2707
2708 self.webhooks.batching.enabled =
2710 parse_bool_env("WEBHOOK_BATCHING_ENABLED", self.webhooks.batching.enabled);
2711 self.webhooks.batching.duration =
2712 parse_env::<u64>("WEBHOOK_BATCHING_DURATION", self.webhooks.batching.duration);
2713 self.webhooks.batching.size =
2714 parse_env::<usize>("WEBHOOK_BATCHING_SIZE", self.webhooks.batching.size);
2715
2716 if let Ok(servers) = std::env::var("NATS_SERVERS") {
2718 self.adapter.nats.servers = servers.split(',').map(|s| s.trim().to_string()).collect();
2719 }
2720 if let Ok(user) = std::env::var("NATS_USERNAME") {
2721 self.adapter.nats.username = Some(user);
2722 }
2723 if let Ok(pass) = std::env::var("NATS_PASSWORD") {
2724 self.adapter.nats.password = Some(pass);
2725 }
2726 if let Ok(token) = std::env::var("NATS_TOKEN") {
2727 self.adapter.nats.token = Some(token);
2728 }
2729 if let Ok(prefix) = std::env::var("NATS_PREFIX") {
2730 self.adapter.nats.prefix = prefix;
2731 }
2732 self.adapter.nats.connection_timeout_ms = parse_env::<u64>(
2733 "NATS_CONNECTION_TIMEOUT_MS",
2734 self.adapter.nats.connection_timeout_ms,
2735 );
2736 self.adapter.nats.request_timeout_ms = parse_env::<u64>(
2737 "NATS_REQUEST_TIMEOUT_MS",
2738 self.adapter.nats.request_timeout_ms,
2739 );
2740 self.adapter.nats.discovery_max_wait_ms = parse_env::<u64>(
2741 "NATS_DISCOVERY_MAX_WAIT_MS",
2742 self.adapter.nats.discovery_max_wait_ms,
2743 );
2744 self.adapter.nats.discovery_idle_wait_ms = parse_env::<u64>(
2745 "NATS_DISCOVERY_IDLE_WAIT_MS",
2746 self.adapter.nats.discovery_idle_wait_ms,
2747 );
2748 if let Some(nodes) = parse_env_optional::<u32>("NATS_NODES_NUMBER") {
2749 self.adapter.nats.nodes_number = Some(nodes);
2750 }
2751
2752 if let Ok(url) = std::env::var("PULSAR_URL") {
2754 self.adapter.pulsar.url = url;
2755 }
2756 if let Ok(prefix) = std::env::var("PULSAR_PREFIX") {
2757 self.adapter.pulsar.prefix = prefix;
2758 }
2759 if let Ok(token) = std::env::var("PULSAR_TOKEN") {
2760 self.adapter.pulsar.token = Some(token);
2761 }
2762 self.adapter.pulsar.request_timeout_ms = parse_env::<u64>(
2763 "PULSAR_REQUEST_TIMEOUT_MS",
2764 self.adapter.pulsar.request_timeout_ms,
2765 );
2766 if let Some(nodes) = parse_env_optional::<u32>("PULSAR_NODES_NUMBER") {
2767 self.adapter.pulsar.nodes_number = Some(nodes);
2768 }
2769
2770 if let Ok(url) = std::env::var("RABBITMQ_URL") {
2772 self.adapter.rabbitmq.url = url;
2773 }
2774 if let Ok(prefix) = std::env::var("RABBITMQ_PREFIX") {
2775 self.adapter.rabbitmq.prefix = prefix;
2776 }
2777 self.adapter.rabbitmq.connection_timeout_ms = parse_env::<u64>(
2778 "RABBITMQ_CONNECTION_TIMEOUT_MS",
2779 self.adapter.rabbitmq.connection_timeout_ms,
2780 );
2781 self.adapter.rabbitmq.request_timeout_ms = parse_env::<u64>(
2782 "RABBITMQ_REQUEST_TIMEOUT_MS",
2783 self.adapter.rabbitmq.request_timeout_ms,
2784 );
2785 if let Some(nodes) = parse_env_optional::<u32>("RABBITMQ_NODES_NUMBER") {
2786 self.adapter.rabbitmq.nodes_number = Some(nodes);
2787 }
2788
2789 if let Ok(project_id) = std::env::var("GOOGLE_PUBSUB_PROJECT_ID") {
2791 self.adapter.google_pubsub.project_id = project_id;
2792 }
2793 if let Ok(prefix) = std::env::var("GOOGLE_PUBSUB_PREFIX") {
2794 self.adapter.google_pubsub.prefix = prefix;
2795 }
2796 if let Ok(emulator_host) = std::env::var("PUBSUB_EMULATOR_HOST") {
2797 self.adapter.google_pubsub.emulator_host = Some(emulator_host);
2798 }
2799 self.adapter.google_pubsub.request_timeout_ms = parse_env::<u64>(
2800 "GOOGLE_PUBSUB_REQUEST_TIMEOUT_MS",
2801 self.adapter.google_pubsub.request_timeout_ms,
2802 );
2803 if let Some(nodes) = parse_env_optional::<u32>("GOOGLE_PUBSUB_NODES_NUMBER") {
2804 self.adapter.google_pubsub.nodes_number = Some(nodes);
2805 }
2806
2807 if let Ok(brokers) = std::env::var("KAFKA_BROKERS") {
2809 self.adapter.kafka.brokers = brokers.split(',').map(|s| s.trim().to_string()).collect();
2810 }
2811 if let Ok(prefix) = std::env::var("KAFKA_PREFIX") {
2812 self.adapter.kafka.prefix = prefix;
2813 }
2814 if let Ok(protocol) = std::env::var("KAFKA_SECURITY_PROTOCOL") {
2815 self.adapter.kafka.security_protocol = Some(protocol);
2816 }
2817 if let Ok(mechanism) = std::env::var("KAFKA_SASL_MECHANISM") {
2818 self.adapter.kafka.sasl_mechanism = Some(mechanism);
2819 }
2820 if let Ok(username) = std::env::var("KAFKA_SASL_USERNAME") {
2821 self.adapter.kafka.sasl_username = Some(username);
2822 }
2823 if let Ok(password) = std::env::var("KAFKA_SASL_PASSWORD") {
2824 self.adapter.kafka.sasl_password = Some(password);
2825 }
2826 self.adapter.kafka.request_timeout_ms = parse_env::<u64>(
2827 "KAFKA_REQUEST_TIMEOUT_MS",
2828 self.adapter.kafka.request_timeout_ms,
2829 );
2830 if let Some(nodes) = parse_env_optional::<u32>("KAFKA_NODES_NUMBER") {
2831 self.adapter.kafka.nodes_number = Some(nodes);
2832 }
2833
2834 if let Ok(origins) = std::env::var("CORS_ORIGINS") {
2836 let parsed: Vec<String> = origins.split(',').map(|s| s.trim().to_string()).collect();
2837 if let Err(e) = crate::origin_validation::OriginValidator::validate_patterns(&parsed) {
2838 warn!(
2839 "CORS_ORIGINS contains invalid patterns: {}. Keeping previous CORS origins.",
2840 e
2841 );
2842 } else {
2843 self.cors.origin = parsed;
2844 }
2845 }
2846 if let Ok(methods) = std::env::var("CORS_METHODS") {
2847 self.cors.methods = methods.split(',').map(|s| s.trim().to_string()).collect();
2848 }
2849 if let Ok(headers) = std::env::var("CORS_HEADERS") {
2850 self.cors.allowed_headers = headers.split(',').map(|s| s.trim().to_string()).collect();
2851 }
2852 self.cors.credentials = parse_bool_env("CORS_CREDENTIALS", self.cors.credentials);
2853
2854 self.database_pooling.enabled =
2856 parse_bool_env("DATABASE_POOLING_ENABLED", self.database_pooling.enabled);
2857 if let Some(min) = parse_env_optional::<u32>("DATABASE_POOL_MIN") {
2858 self.database_pooling.min = min;
2859 }
2860 if let Some(max) = parse_env_optional::<u32>("DATABASE_POOL_MAX") {
2861 self.database_pooling.max = max;
2862 }
2863
2864 if let Some(pool_size) = parse_env_optional::<u32>("DATABASE_CONNECTION_POOL_SIZE") {
2865 self.database.mysql.connection_pool_size = pool_size;
2866 self.database.postgres.connection_pool_size = pool_size;
2867 }
2868 if let Some(cache_ttl) = parse_env_optional::<u64>("CACHE_TTL_SECONDS") {
2869 self.app_manager.cache.ttl = cache_ttl;
2870 self.channel_limits.cache_ttl = cache_ttl;
2871 self.database.mysql.cache_ttl = cache_ttl;
2872 self.database.postgres.cache_ttl = cache_ttl;
2873 self.database.surrealdb.cache_ttl = cache_ttl;
2874 self.cache.memory.ttl = cache_ttl;
2875 }
2876 if let Some(cleanup_interval) = parse_env_optional::<u64>("CACHE_CLEANUP_INTERVAL") {
2877 self.database.mysql.cache_cleanup_interval = cleanup_interval;
2878 self.database.postgres.cache_cleanup_interval = cleanup_interval;
2879 self.cache.memory.cleanup_interval = cleanup_interval;
2880 }
2881 if let Some(max_capacity) = parse_env_optional::<u64>("CACHE_MAX_CAPACITY") {
2882 self.database.mysql.cache_max_capacity = max_capacity;
2883 self.database.postgres.cache_max_capacity = max_capacity;
2884 self.database.surrealdb.cache_max_capacity = max_capacity;
2885 self.cache.memory.max_capacity = max_capacity;
2886 }
2887
2888 let default_app_id = std::env::var("SOCKUDO_DEFAULT_APP_ID");
2889 let default_app_key = std::env::var("SOCKUDO_DEFAULT_APP_KEY");
2890 let default_app_secret = std::env::var("SOCKUDO_DEFAULT_APP_SECRET");
2891 let default_app_enabled = parse_bool_env("SOCKUDO_DEFAULT_APP_ENABLED", true);
2892
2893 if let (Ok(app_id), Ok(app_key), Ok(app_secret)) =
2894 (default_app_id, default_app_key, default_app_secret)
2895 && default_app_enabled
2896 {
2897 let default_app = App::from_policy(
2898 app_id,
2899 app_key,
2900 app_secret,
2901 default_app_enabled,
2902 crate::app::AppPolicy {
2903 limits: crate::app::AppLimitsPolicy {
2904 max_connections: parse_env::<u32>(
2905 "SOCKUDO_DEFAULT_APP_MAX_CONNECTIONS",
2906 100,
2907 ),
2908 max_backend_events_per_second: Some(parse_env::<u32>(
2909 "SOCKUDO_DEFAULT_APP_MAX_BACKEND_EVENTS_PER_SECOND",
2910 100,
2911 )),
2912 max_client_events_per_second: parse_env::<u32>(
2913 "SOCKUDO_DEFAULT_APP_MAX_CLIENT_EVENTS_PER_SECOND",
2914 100,
2915 ),
2916 max_read_requests_per_second: Some(parse_env::<u32>(
2917 "SOCKUDO_DEFAULT_APP_MAX_READ_REQUESTS_PER_SECOND",
2918 100,
2919 )),
2920 max_presence_members_per_channel: Some(parse_env::<u32>(
2921 "SOCKUDO_DEFAULT_APP_MAX_PRESENCE_MEMBERS_PER_CHANNEL",
2922 100,
2923 )),
2924 max_presence_member_size_in_kb: Some(parse_env::<u32>(
2925 "SOCKUDO_DEFAULT_APP_MAX_PRESENCE_MEMBER_SIZE_IN_KB",
2926 100,
2927 )),
2928 max_channel_name_length: Some(parse_env::<u32>(
2929 "SOCKUDO_DEFAULT_APP_MAX_CHANNEL_NAME_LENGTH",
2930 100,
2931 )),
2932 max_event_channels_at_once: Some(parse_env::<u32>(
2933 "SOCKUDO_DEFAULT_APP_MAX_EVENT_CHANNELS_AT_ONCE",
2934 100,
2935 )),
2936 max_event_name_length: Some(parse_env::<u32>(
2937 "SOCKUDO_DEFAULT_APP_MAX_EVENT_NAME_LENGTH",
2938 100,
2939 )),
2940 max_event_payload_in_kb: Some(parse_env::<u32>(
2941 "SOCKUDO_DEFAULT_APP_MAX_EVENT_PAYLOAD_IN_KB",
2942 100,
2943 )),
2944 max_event_batch_size: Some(parse_env::<u32>(
2945 "SOCKUDO_DEFAULT_APP_MAX_EVENT_BATCH_SIZE",
2946 100,
2947 )),
2948 decay_seconds: None,
2949 terminate_on_limit: false,
2950 message_rate_limit: None,
2951 },
2952 features: crate::app::AppFeaturesPolicy {
2953 enable_client_messages: std::env::var(
2954 "SOCKUDO_DEFAULT_APP_ENABLE_CLIENT_MESSAGES",
2955 )
2956 .ok()
2957 .map(|value| {
2958 matches!(
2959 value.trim().to_ascii_lowercase().as_str(),
2960 "true" | "1" | "yes" | "on"
2961 )
2962 })
2963 .unwrap_or_else(|| parse_bool_env("SOCKUDO_ENABLE_CLIENT_MESSAGES", false)),
2964 enable_user_authentication: Some(parse_bool_env(
2965 "SOCKUDO_DEFAULT_APP_ENABLE_USER_AUTHENTICATION",
2966 false,
2967 )),
2968 enable_watchlist_events: Some(parse_bool_env(
2969 "SOCKUDO_DEFAULT_APP_ENABLE_WATCHLIST_EVENTS",
2970 false,
2971 )),
2972 },
2973 channels: crate::app::AppChannelsPolicy {
2974 allowed_origins: {
2975 if let Ok(origins_str) =
2976 std::env::var("SOCKUDO_DEFAULT_APP_ALLOWED_ORIGINS")
2977 {
2978 if !origins_str.is_empty() {
2979 Some(
2980 origins_str
2981 .split(',')
2982 .map(|s| s.trim().to_string())
2983 .collect(),
2984 )
2985 } else {
2986 None
2987 }
2988 } else {
2989 None
2990 }
2991 },
2992 channel_delta_compression: None,
2993 channel_namespaces: None,
2994 },
2995 webhooks: None,
2996 idempotency: None,
2997 connection_recovery: None,
2998 history: None,
2999 presence_history: None,
3000 },
3001 );
3002
3003 self.app_manager.array.apps.push(default_app);
3004 info!("Successfully registered default app from env");
3005 }
3006
3007 if let Ok(redis_url_env) = std::env::var("REDIS_URL") {
3009 info!("Applying REDIS_URL environment variable override");
3010
3011 let redis_url_json = sonic_rs::json!(redis_url_env);
3012
3013 self.adapter
3014 .redis
3015 .redis_pub_options
3016 .insert("url".to_string(), redis_url_json.clone());
3017 self.adapter
3018 .redis
3019 .redis_sub_options
3020 .insert("url".to_string(), redis_url_json);
3021
3022 self.cache.redis.url_override = Some(redis_url_env.clone());
3023 self.queue.redis.url_override = Some(redis_url_env.clone());
3024 self.rate_limiter.redis.url_override = Some(redis_url_env);
3025 }
3026
3027 let has_colors_env = std::env::var("LOG_COLORS_ENABLED").is_ok();
3029 let has_target_env = std::env::var("LOG_INCLUDE_TARGET").is_ok();
3030 if has_colors_env || has_target_env {
3031 let logging_config = self.logging.get_or_insert_with(Default::default);
3032 if has_colors_env {
3033 logging_config.colors_enabled =
3034 parse_bool_env("LOG_COLORS_ENABLED", logging_config.colors_enabled);
3035 }
3036 if has_target_env {
3037 logging_config.include_target =
3038 parse_bool_env("LOG_INCLUDE_TARGET", logging_config.include_target);
3039 }
3040 }
3041
3042 self.cleanup.async_enabled =
3044 parse_bool_env("CLEANUP_ASYNC_ENABLED", self.cleanup.async_enabled);
3045 self.cleanup.fallback_to_sync =
3046 parse_bool_env("CLEANUP_FALLBACK_TO_SYNC", self.cleanup.fallback_to_sync);
3047 self.cleanup.queue_buffer_size =
3048 parse_env::<usize>("CLEANUP_QUEUE_BUFFER_SIZE", self.cleanup.queue_buffer_size);
3049 self.cleanup.batch_size = parse_env::<usize>("CLEANUP_BATCH_SIZE", self.cleanup.batch_size);
3050 self.cleanup.batch_timeout_ms =
3051 parse_env::<u64>("CLEANUP_BATCH_TIMEOUT_MS", self.cleanup.batch_timeout_ms);
3052 if let Ok(worker_threads_str) = std::env::var("CLEANUP_WORKER_THREADS") {
3053 self.cleanup.worker_threads = if worker_threads_str.to_lowercase() == "auto" {
3054 WorkerThreadsConfig::Auto
3055 } else if let Ok(n) = worker_threads_str.parse::<usize>() {
3056 WorkerThreadsConfig::Fixed(n)
3057 } else {
3058 warn!(
3059 "Invalid CLEANUP_WORKER_THREADS value '{}', keeping current setting",
3060 worker_threads_str
3061 );
3062 self.cleanup.worker_threads.clone()
3063 };
3064 }
3065 self.cleanup.max_retry_attempts = parse_env::<u32>(
3066 "CLEANUP_MAX_RETRY_ATTEMPTS",
3067 self.cleanup.max_retry_attempts,
3068 );
3069
3070 self.cluster_health.enabled =
3072 parse_bool_env("CLUSTER_HEALTH_ENABLED", self.cluster_health.enabled);
3073 self.cluster_health.heartbeat_interval_ms = parse_env::<u64>(
3074 "CLUSTER_HEALTH_HEARTBEAT_INTERVAL",
3075 self.cluster_health.heartbeat_interval_ms,
3076 );
3077 self.cluster_health.node_timeout_ms = parse_env::<u64>(
3078 "CLUSTER_HEALTH_NODE_TIMEOUT",
3079 self.cluster_health.node_timeout_ms,
3080 );
3081 self.cluster_health.cleanup_interval_ms = parse_env::<u64>(
3082 "CLUSTER_HEALTH_CLEANUP_INTERVAL",
3083 self.cluster_health.cleanup_interval_ms,
3084 );
3085
3086 self.tag_filtering.enabled =
3088 parse_bool_env("TAG_FILTERING_ENABLED", self.tag_filtering.enabled);
3089
3090 if let Ok(val) = std::env::var("WEBSOCKET_MAX_MESSAGES") {
3092 if val.to_lowercase() == "none" || val == "0" {
3093 self.websocket.max_messages = None;
3094 } else if let Ok(n) = val.parse::<usize>() {
3095 self.websocket.max_messages = Some(n);
3096 }
3097 }
3098 if let Ok(val) = std::env::var("WEBSOCKET_MAX_BYTES") {
3099 if val.to_lowercase() == "none" || val == "0" {
3100 self.websocket.max_bytes = None;
3101 } else if let Ok(n) = val.parse::<usize>() {
3102 self.websocket.max_bytes = Some(n);
3103 }
3104 }
3105 self.websocket.disconnect_on_buffer_full = parse_bool_env(
3106 "WEBSOCKET_DISCONNECT_ON_BUFFER_FULL",
3107 self.websocket.disconnect_on_buffer_full,
3108 );
3109 self.websocket.max_message_size = parse_env::<usize>(
3110 "WEBSOCKET_MAX_MESSAGE_SIZE",
3111 self.websocket.max_message_size,
3112 );
3113 self.websocket.max_frame_size =
3114 parse_env::<usize>("WEBSOCKET_MAX_FRAME_SIZE", self.websocket.max_frame_size);
3115 self.websocket.write_buffer_size = parse_env::<usize>(
3116 "WEBSOCKET_WRITE_BUFFER_SIZE",
3117 self.websocket.write_buffer_size,
3118 );
3119 self.websocket.max_backpressure = parse_env::<usize>(
3120 "WEBSOCKET_MAX_BACKPRESSURE",
3121 self.websocket.max_backpressure,
3122 );
3123 self.websocket.auto_ping = parse_bool_env("WEBSOCKET_AUTO_PING", self.websocket.auto_ping);
3124 self.websocket.ping_interval =
3125 parse_env::<u32>("WEBSOCKET_PING_INTERVAL", self.websocket.ping_interval);
3126 self.websocket.idle_timeout =
3127 parse_env::<u32>("WEBSOCKET_IDLE_TIMEOUT", self.websocket.idle_timeout);
3128 if let Ok(mode) = std::env::var("WEBSOCKET_COMPRESSION") {
3129 self.websocket.compression = mode;
3130 }
3131
3132 self.connection_recovery.enabled = parse_bool_env(
3134 "CONNECTION_RECOVERY_ENABLED",
3135 self.connection_recovery.enabled,
3136 );
3137 self.connection_recovery.buffer_ttl_seconds = parse_env::<u64>(
3138 "CONNECTION_RECOVERY_BUFFER_TTL",
3139 self.connection_recovery.buffer_ttl_seconds,
3140 );
3141 self.connection_recovery.max_buffer_size = parse_env::<usize>(
3142 "CONNECTION_RECOVERY_MAX_BUFFER_SIZE",
3143 self.connection_recovery.max_buffer_size,
3144 );
3145
3146 self.history.enabled = parse_bool_env("HISTORY_ENABLED", self.history.enabled);
3147 self.history.rewind_enabled =
3148 parse_bool_env("HISTORY_REWIND_ENABLED", self.history.rewind_enabled);
3149 self.history.retention_window_seconds = parse_env::<u64>(
3150 "HISTORY_RETENTION_WINDOW_SECONDS",
3151 self.history.retention_window_seconds,
3152 );
3153 self.history.max_page_size =
3154 parse_env::<usize>("HISTORY_MAX_PAGE_SIZE", self.history.max_page_size);
3155 self.history.writer_shards =
3156 parse_env::<usize>("HISTORY_WRITER_SHARDS", self.history.writer_shards);
3157 self.history.writer_queue_capacity = parse_env::<usize>(
3158 "HISTORY_WRITER_QUEUE_CAPACITY",
3159 self.history.writer_queue_capacity,
3160 );
3161 if let Ok(backend) = std::env::var("HISTORY_BACKEND") {
3162 self.history.backend = HistoryBackend::from_str(&backend)?;
3163 }
3164 if let Ok(max_messages) = std::env::var("HISTORY_MAX_MESSAGES_PER_CHANNEL") {
3165 self.history.max_messages_per_channel = Some(
3166 max_messages
3167 .parse::<usize>()
3168 .map_err(|e| format!("Invalid HISTORY_MAX_MESSAGES_PER_CHANNEL: {e}"))?,
3169 );
3170 }
3171 if let Ok(max_bytes) = std::env::var("HISTORY_MAX_BYTES_PER_CHANNEL") {
3172 self.history.max_bytes_per_channel = Some(
3173 max_bytes
3174 .parse::<u64>()
3175 .map_err(|e| format!("Invalid HISTORY_MAX_BYTES_PER_CHANNEL: {e}"))?,
3176 );
3177 }
3178 if let Ok(table_prefix) = std::env::var("HISTORY_POSTGRES_TABLE_PREFIX") {
3179 self.history.postgres.table_prefix = table_prefix;
3180 }
3181 self.history.postgres.write_timeout_ms = parse_env::<u64>(
3182 "HISTORY_POSTGRES_WRITE_TIMEOUT_MS",
3183 self.history.postgres.write_timeout_ms,
3184 );
3185 self.history.purge_interval_seconds = parse_env::<u64>(
3186 "HISTORY_PURGE_INTERVAL_SECONDS",
3187 self.history.purge_interval_seconds,
3188 );
3189 self.history.purge_batch_size =
3190 parse_env::<usize>("HISTORY_PURGE_BATCH_SIZE", self.history.purge_batch_size);
3191 self.history.max_purge_per_tick = parse_env::<usize>(
3192 "HISTORY_MAX_PURGE_PER_TICK",
3193 self.history.max_purge_per_tick,
3194 );
3195
3196 self.presence_history.enabled =
3197 parse_bool_env("PRESENCE_HISTORY_ENABLED", self.presence_history.enabled);
3198 self.presence_history.retention_window_seconds = parse_env::<u64>(
3199 "PRESENCE_HISTORY_RETENTION_WINDOW_SECONDS",
3200 self.presence_history.retention_window_seconds,
3201 );
3202 self.presence_history.max_page_size = parse_env::<usize>(
3203 "PRESENCE_HISTORY_MAX_PAGE_SIZE",
3204 self.presence_history.max_page_size,
3205 );
3206 if let Ok(max_events) = std::env::var("PRESENCE_HISTORY_MAX_EVENTS_PER_CHANNEL") {
3207 self.presence_history.max_events_per_channel =
3208 Some(max_events.parse::<usize>().map_err(|e| {
3209 format!("Invalid PRESENCE_HISTORY_MAX_EVENTS_PER_CHANNEL: {e}")
3210 })?);
3211 }
3212 if let Ok(max_bytes) = std::env::var("PRESENCE_HISTORY_MAX_BYTES_PER_CHANNEL") {
3213 self.presence_history.max_bytes_per_channel = Some(
3214 max_bytes
3215 .parse::<u64>()
3216 .map_err(|e| format!("Invalid PRESENCE_HISTORY_MAX_BYTES_PER_CHANNEL: {e}"))?,
3217 );
3218 }
3219 self.idempotency.enabled = parse_bool_env("IDEMPOTENCY_ENABLED", self.idempotency.enabled);
3220 self.idempotency.ttl_seconds =
3221 parse_env::<u64>("IDEMPOTENCY_TTL_SECONDS", self.idempotency.ttl_seconds);
3222 self.idempotency.max_key_length = parse_env::<usize>(
3223 "IDEMPOTENCY_MAX_KEY_LENGTH",
3224 self.idempotency.max_key_length,
3225 );
3226
3227 self.ephemeral.enabled = parse_bool_env("EPHEMERAL_ENABLED", self.ephemeral.enabled);
3228
3229 self.echo_control.enabled =
3230 parse_bool_env("ECHO_CONTROL_ENABLED", self.echo_control.enabled);
3231 self.echo_control.default_echo_messages = parse_bool_env(
3232 "ECHO_CONTROL_DEFAULT_ECHO_MESSAGES",
3233 self.echo_control.default_echo_messages,
3234 );
3235
3236 self.event_name_filtering.enabled = parse_bool_env(
3237 "EVENT_NAME_FILTERING_ENABLED",
3238 self.event_name_filtering.enabled,
3239 );
3240 self.event_name_filtering.max_events_per_filter = parse_env::<usize>(
3241 "EVENT_NAME_FILTERING_MAX_EVENTS_PER_FILTER",
3242 self.event_name_filtering.max_events_per_filter,
3243 );
3244 self.event_name_filtering.max_event_name_length = parse_env::<usize>(
3245 "EVENT_NAME_FILTERING_MAX_EVENT_NAME_LENGTH",
3246 self.event_name_filtering.max_event_name_length,
3247 );
3248 self.versioned_messages.enabled = parse_bool_env(
3249 "VERSIONED_MESSAGES_ENABLED",
3250 self.versioned_messages.enabled,
3251 );
3252 if let Ok(driver_str) = std::env::var("VERSIONED_MESSAGES_DRIVER") {
3253 self.versioned_messages.driver = parse_driver_enum(
3254 driver_str,
3255 self.versioned_messages.driver.clone(),
3256 "VersionedMessages Backend",
3257 );
3258 }
3259 self.versioned_messages.max_page_size = parse_env::<usize>(
3260 "VERSIONED_MESSAGES_MAX_PAGE_SIZE",
3261 self.versioned_messages.max_page_size,
3262 );
3263 self.versioned_messages.retention_window_seconds = parse_env::<u64>(
3264 "VERSIONED_MESSAGES_RETENTION_WINDOW_SECONDS",
3265 self.versioned_messages.retention_window_seconds,
3266 );
3267 self.versioned_messages.purge_interval_seconds = parse_env::<u64>(
3268 "VERSIONED_MESSAGES_PURGE_INTERVAL_SECONDS",
3269 self.versioned_messages.purge_interval_seconds,
3270 );
3271 self.versioned_messages.purge_batch_size = parse_env::<usize>(
3272 "VERSIONED_MESSAGES_PURGE_BATCH_SIZE",
3273 self.versioned_messages.purge_batch_size,
3274 );
3275 self.versioned_messages.max_purge_per_tick = parse_env::<usize>(
3276 "VERSIONED_MESSAGES_MAX_PURGE_PER_TICK",
3277 self.versioned_messages.max_purge_per_tick,
3278 );
3279
3280 Ok(())
3281 }
3282
3283 pub fn validate(&self) -> Result<(), String> {
3284 if self.unix_socket.enabled {
3285 if self.unix_socket.path.is_empty() {
3286 return Err(
3287 "Unix socket path cannot be empty when Unix socket is enabled".to_string(),
3288 );
3289 }
3290
3291 self.validate_unix_socket_security()?;
3292
3293 if self.ssl.enabled {
3294 tracing::warn!(
3295 "Both Unix socket and SSL are enabled. This is unusual as Unix sockets are typically used behind reverse proxies that handle SSL termination."
3296 );
3297 }
3298
3299 if self.unix_socket.permission_mode > 0o777 {
3300 return Err(format!(
3301 "Unix socket permission_mode ({:o}) is invalid. Must be a valid octal mode (0o000 to 0o777)",
3302 self.unix_socket.permission_mode
3303 ));
3304 }
3305 }
3306
3307 if let Err(e) = self.cleanup.validate() {
3308 return Err(format!("Invalid cleanup configuration: {}", e));
3309 }
3310
3311 if self.history.enabled {
3312 if self.history.max_page_size == 0 {
3313 return Err("history.max_page_size must be greater than 0".to_string());
3314 }
3315 if self.history.writer_shards == 0 {
3316 return Err("history.writer_shards must be greater than 0".to_string());
3317 }
3318 if self.history.writer_queue_capacity == 0 {
3319 return Err("history.writer_queue_capacity must be greater than 0".to_string());
3320 }
3321 if self.history.retention_window_seconds == 0 {
3322 return Err("history.retention_window_seconds must be greater than 0".to_string());
3323 }
3324 if self.history.postgres.table_prefix.trim().is_empty() {
3325 return Err("history.postgres.table_prefix must not be empty".to_string());
3326 }
3327 }
3328
3329 if self.presence_history.enabled {
3330 if self.presence_history.max_page_size == 0 {
3331 return Err("presence_history.max_page_size must be greater than 0".to_string());
3332 }
3333 if self.presence_history.retention_window_seconds == 0 {
3334 return Err(
3335 "presence_history.retention_window_seconds must be greater than 0".to_string(),
3336 );
3337 }
3338 }
3339
3340 if self.versioned_messages.enabled && self.versioned_messages.max_page_size == 0 {
3341 return Err("versioned_messages.max_page_size must be greater than 0".to_string());
3342 }
3343
3344 Ok(())
3345 }
3346
3347 fn validate_unix_socket_security(&self) -> Result<(), String> {
3348 let path = &self.unix_socket.path;
3349
3350 if path.contains("../") || path.contains("..\\") {
3351 return Err(
3352 "Unix socket path contains directory traversal sequences (../). This is not allowed for security reasons.".to_string()
3353 );
3354 }
3355
3356 if self.unix_socket.permission_mode & 0o002 != 0 {
3357 warn!(
3358 "Unix socket permission mode ({:o}) allows world write access. This may be a security risk. Consider using more restrictive permissions like 0o660 or 0o750.",
3359 self.unix_socket.permission_mode
3360 );
3361 }
3362
3363 if self.unix_socket.permission_mode & 0o007 > 0o005 {
3364 warn!(
3365 "Unix socket permission mode ({:o}) grants write permissions to others. Consider using more restrictive permissions.",
3366 self.unix_socket.permission_mode
3367 );
3368 }
3369
3370 if self.mode == "production" && path.starts_with("/tmp/") {
3371 warn!(
3372 "Unix socket path '{}' is in /tmp directory. In production, consider using a more permanent location like /var/run/sockudo/ for better security and persistence.",
3373 path
3374 );
3375 }
3376
3377 if !path.starts_with('/') {
3378 return Err(
3379 "Unix socket path must be absolute (start with /) for security and reliability."
3380 .to_string(),
3381 );
3382 }
3383
3384 Ok(())
3385 }
3386}
3387
3388#[cfg(test)]
3389mod redis_connection_tests {
3390 use super::{ClusterNode, RedisClusterConnection, RedisConnection, RedisSentinel};
3391
3392 #[test]
3393 fn test_standard_url_basic() {
3394 let conn = RedisConnection {
3395 host: "127.0.0.1".to_string(),
3396 port: 6379,
3397 db: 0,
3398 username: None,
3399 password: None,
3400 key_prefix: "sockudo:".to_string(),
3401 sentinels: Vec::new(),
3402 sentinel_password: None,
3403 name: "mymaster".to_string(),
3404 cluster: RedisClusterConnection::default(),
3405 cluster_nodes: Vec::new(),
3406 };
3407 assert_eq!(conn.to_url(), "redis://127.0.0.1:6379/0");
3408 }
3409
3410 #[test]
3411 fn test_standard_url_with_password() {
3412 let conn = RedisConnection {
3413 host: "127.0.0.1".to_string(),
3414 port: 6379,
3415 db: 2,
3416 username: None,
3417 password: Some("secret".to_string()),
3418 key_prefix: "sockudo:".to_string(),
3419 sentinels: Vec::new(),
3420 sentinel_password: None,
3421 name: "mymaster".to_string(),
3422 cluster: RedisClusterConnection::default(),
3423 cluster_nodes: Vec::new(),
3424 };
3425 assert_eq!(conn.to_url(), "redis://:secret@127.0.0.1:6379/2");
3426 }
3427
3428 #[test]
3429 fn test_standard_url_with_username_and_password() {
3430 let conn = RedisConnection {
3431 host: "redis.example.com".to_string(),
3432 port: 6380,
3433 db: 1,
3434 username: Some("admin".to_string()),
3435 password: Some("pass123".to_string()),
3436 key_prefix: "sockudo:".to_string(),
3437 sentinels: Vec::new(),
3438 sentinel_password: None,
3439 name: "mymaster".to_string(),
3440 cluster: RedisClusterConnection::default(),
3441 cluster_nodes: Vec::new(),
3442 };
3443 assert_eq!(
3444 conn.to_url(),
3445 "redis://admin:pass123@redis.example.com:6380/1"
3446 );
3447 }
3448
3449 #[test]
3450 fn test_standard_url_with_special_chars_in_password() {
3451 let conn = RedisConnection {
3452 host: "127.0.0.1".to_string(),
3453 port: 6379,
3454 db: 0,
3455 username: None,
3456 password: Some("pass@word#123".to_string()),
3457 key_prefix: "sockudo:".to_string(),
3458 sentinels: Vec::new(),
3459 sentinel_password: None,
3460 name: "mymaster".to_string(),
3461 cluster: RedisClusterConnection::default(),
3462 cluster_nodes: Vec::new(),
3463 };
3464 assert_eq!(conn.to_url(), "redis://:pass%40word%23123@127.0.0.1:6379/0");
3465 }
3466
3467 #[test]
3468 fn test_is_sentinel_configured_false() {
3469 let conn = RedisConnection::default();
3470 assert!(!conn.is_sentinel_configured());
3471 }
3472
3473 #[test]
3474 fn test_is_sentinel_configured_true() {
3475 let conn = RedisConnection {
3476 sentinels: vec![RedisSentinel {
3477 host: "sentinel1".to_string(),
3478 port: 26379,
3479 }],
3480 ..Default::default()
3481 };
3482 assert!(conn.is_sentinel_configured());
3483 }
3484
3485 #[test]
3486 fn test_sentinel_url_basic() {
3487 let conn = RedisConnection {
3488 host: "127.0.0.1".to_string(),
3489 port: 6379,
3490 db: 0,
3491 username: None,
3492 password: None,
3493 key_prefix: "sockudo:".to_string(),
3494 sentinels: vec![
3495 RedisSentinel {
3496 host: "sentinel1".to_string(),
3497 port: 26379,
3498 },
3499 RedisSentinel {
3500 host: "sentinel2".to_string(),
3501 port: 26379,
3502 },
3503 ],
3504 sentinel_password: None,
3505 name: "mymaster".to_string(),
3506 cluster: RedisClusterConnection::default(),
3507 cluster_nodes: Vec::new(),
3508 };
3509 assert_eq!(
3510 conn.to_url(),
3511 "redis+sentinel://sentinel1:26379,sentinel2:26379/mymaster/0"
3512 );
3513 }
3514
3515 #[test]
3516 fn test_sentinel_url_with_sentinel_password() {
3517 let conn = RedisConnection {
3518 host: "127.0.0.1".to_string(),
3519 port: 6379,
3520 db: 0,
3521 username: None,
3522 password: None,
3523 key_prefix: "sockudo:".to_string(),
3524 sentinels: vec![RedisSentinel {
3525 host: "sentinel1".to_string(),
3526 port: 26379,
3527 }],
3528 sentinel_password: Some("sentinelpass".to_string()),
3529 name: "mymaster".to_string(),
3530 cluster: RedisClusterConnection::default(),
3531 cluster_nodes: Vec::new(),
3532 };
3533 assert_eq!(
3534 conn.to_url(),
3535 "redis+sentinel://:sentinelpass@sentinel1:26379/mymaster/0"
3536 );
3537 }
3538
3539 #[test]
3540 fn test_sentinel_url_with_master_password() {
3541 let conn = RedisConnection {
3542 host: "127.0.0.1".to_string(),
3543 port: 6379,
3544 db: 1,
3545 username: None,
3546 password: Some("masterpass".to_string()),
3547 key_prefix: "sockudo:".to_string(),
3548 sentinels: vec![RedisSentinel {
3549 host: "sentinel1".to_string(),
3550 port: 26379,
3551 }],
3552 sentinel_password: None,
3553 name: "mymaster".to_string(),
3554 cluster: RedisClusterConnection::default(),
3555 cluster_nodes: Vec::new(),
3556 };
3557 assert_eq!(
3558 conn.to_url(),
3559 "redis+sentinel://sentinel1:26379/mymaster/1?password=masterpass"
3560 );
3561 }
3562
3563 #[test]
3564 fn test_sentinel_url_with_all_auth() {
3565 let conn = RedisConnection {
3566 host: "127.0.0.1".to_string(),
3567 port: 6379,
3568 db: 2,
3569 username: Some("redisuser".to_string()),
3570 password: Some("redispass".to_string()),
3571 key_prefix: "sockudo:".to_string(),
3572 sentinels: vec![
3573 RedisSentinel {
3574 host: "sentinel1".to_string(),
3575 port: 26379,
3576 },
3577 RedisSentinel {
3578 host: "sentinel2".to_string(),
3579 port: 26380,
3580 },
3581 ],
3582 sentinel_password: Some("sentinelauth".to_string()),
3583 name: "production-master".to_string(),
3584 cluster: RedisClusterConnection::default(),
3585 cluster_nodes: Vec::new(),
3586 };
3587 assert_eq!(
3588 conn.to_url(),
3589 "redis+sentinel://:sentinelauth@sentinel1:26379,sentinel2:26380/production-master/2?password=redispass&username=redisuser"
3590 );
3591 }
3592
3593 #[test]
3594 fn test_sentinel_to_host_port() {
3595 let sentinel = RedisSentinel {
3596 host: "sentinel.example.com".to_string(),
3597 port: 26379,
3598 };
3599 assert_eq!(sentinel.to_host_port(), "sentinel.example.com:26379");
3600 }
3601
3602 #[test]
3603 fn test_cluster_node_urls_with_shared_cluster_auth_and_tls() {
3604 let conn = RedisConnection {
3605 cluster: RedisClusterConnection {
3606 nodes: vec![
3607 ClusterNode {
3608 host: "node1.secure-cluster.com".to_string(),
3609 port: 7000,
3610 },
3611 ClusterNode {
3612 host: "redis://node2.secure-cluster.com:7001".to_string(),
3613 port: 7001,
3614 },
3615 ClusterNode {
3616 host: "rediss://node3.secure-cluster.com".to_string(),
3617 port: 7002,
3618 },
3619 ],
3620 username: None,
3621 password: Some("cluster-secret".to_string()),
3622 use_tls: true,
3623 },
3624 ..Default::default()
3625 };
3626
3627 assert_eq!(
3628 conn.cluster_node_urls(),
3629 vec![
3630 "rediss://:cluster-secret@node1.secure-cluster.com:7000",
3631 "redis://:cluster-secret@node2.secure-cluster.com:7001",
3632 "rediss://:cluster-secret@node3.secure-cluster.com:7002",
3633 ]
3634 );
3635 }
3636
3637 #[test]
3638 fn test_cluster_node_urls_fallback_to_legacy_nodes() {
3639 let conn = RedisConnection {
3640 password: Some("fallback-secret".to_string()),
3641 cluster_nodes: vec![ClusterNode {
3642 host: "legacy-node.example.com".to_string(),
3643 port: 7000,
3644 }],
3645 ..Default::default()
3646 };
3647
3648 assert_eq!(
3649 conn.cluster_node_urls(),
3650 vec!["redis://:fallback-secret@legacy-node.example.com:7000"]
3651 );
3652 }
3653
3654 #[test]
3655 fn test_normalize_cluster_seed_urls() {
3656 let conn = RedisConnection {
3657 cluster: RedisClusterConnection {
3658 nodes: Vec::new(),
3659 username: Some("svc-user".to_string()),
3660 password: Some("svc-pass".to_string()),
3661 use_tls: true,
3662 },
3663 ..Default::default()
3664 };
3665
3666 let seeds = vec![
3667 "node1.example.com:7000".to_string(),
3668 "redis://node2.example.com:7001".to_string(),
3669 "rediss://node3.example.com".to_string(),
3670 ];
3671
3672 assert_eq!(
3673 conn.normalize_cluster_seed_urls(&seeds),
3674 vec![
3675 "rediss://svc-user:svc-pass@node1.example.com:7000",
3676 "redis://svc-user:svc-pass@node2.example.com:7001",
3677 "rediss://svc-user:svc-pass@node3.example.com:6379",
3678 ]
3679 );
3680 }
3681}
3682
3683#[cfg(test)]
3684mod cluster_node_tests {
3685 use super::ClusterNode;
3686
3687 #[test]
3688 fn test_to_url_basic_host() {
3689 let node = ClusterNode {
3690 host: "localhost".to_string(),
3691 port: 6379,
3692 };
3693 assert_eq!(node.to_url(), "redis://localhost:6379");
3694 }
3695
3696 #[test]
3697 fn test_to_url_ip_address() {
3698 let node = ClusterNode {
3699 host: "127.0.0.1".to_string(),
3700 port: 6379,
3701 };
3702 assert_eq!(node.to_url(), "redis://127.0.0.1:6379");
3703 }
3704
3705 #[test]
3706 fn test_to_url_with_redis_protocol() {
3707 let node = ClusterNode {
3708 host: "redis://example.com".to_string(),
3709 port: 6379,
3710 };
3711 assert_eq!(node.to_url(), "redis://example.com:6379");
3712 }
3713
3714 #[test]
3715 fn test_to_url_with_rediss_protocol() {
3716 let node = ClusterNode {
3717 host: "rediss://secure.example.com".to_string(),
3718 port: 6379,
3719 };
3720 assert_eq!(node.to_url(), "rediss://secure.example.com:6379");
3721 }
3722
3723 #[test]
3724 fn test_to_url_with_rediss_protocol_and_port_in_url() {
3725 let node = ClusterNode {
3726 host: "rediss://secure.example.com:7000".to_string(),
3727 port: 6379,
3728 };
3729 assert_eq!(node.to_url(), "rediss://secure.example.com:7000");
3730 }
3731
3732 #[test]
3733 fn test_to_url_with_redis_protocol_and_port_in_url() {
3734 let node = ClusterNode {
3735 host: "redis://example.com:7001".to_string(),
3736 port: 6379,
3737 };
3738 assert_eq!(node.to_url(), "redis://example.com:7001");
3739 }
3740
3741 #[test]
3742 fn test_to_url_with_trailing_whitespace() {
3743 let node = ClusterNode {
3744 host: " rediss://secure.example.com ".to_string(),
3745 port: 6379,
3746 };
3747 assert_eq!(node.to_url(), "rediss://secure.example.com:6379");
3748 }
3749
3750 #[test]
3751 fn test_to_url_custom_port() {
3752 let node = ClusterNode {
3753 host: "redis-cluster.example.com".to_string(),
3754 port: 7000,
3755 };
3756 assert_eq!(node.to_url(), "redis://redis-cluster.example.com:7000");
3757 }
3758
3759 #[test]
3760 fn test_to_url_plain_host_with_port_in_host_field() {
3761 let node = ClusterNode {
3762 host: "redis-cluster.example.com:7010".to_string(),
3763 port: 7000,
3764 };
3765 assert_eq!(node.to_url(), "redis://redis-cluster.example.com:7010");
3766 }
3767
3768 #[test]
3769 fn test_to_url_with_options_adds_auth_and_tls() {
3770 let node = ClusterNode {
3771 host: "node.example.com".to_string(),
3772 port: 7000,
3773 };
3774 assert_eq!(
3775 node.to_url_with_options(true, Some("svc-user"), Some("secret")),
3776 "rediss://svc-user:secret@node.example.com:7000"
3777 );
3778 }
3779
3780 #[test]
3781 fn test_to_url_with_options_keeps_embedded_auth() {
3782 let node = ClusterNode {
3783 host: "rediss://:node-secret@node.example.com:7000".to_string(),
3784 port: 7000,
3785 };
3786 assert_eq!(
3787 node.to_url_with_options(true, Some("global-user"), Some("global-secret")),
3788 "rediss://:node-secret@node.example.com:7000"
3789 );
3790 }
3791
3792 #[test]
3793 fn test_from_seed_parses_plain_host_port() {
3794 let node = ClusterNode::from_seed("cluster-node-1:7005").expect("node should parse");
3795 assert_eq!(node.host, "cluster-node-1");
3796 assert_eq!(node.port, 7005);
3797 }
3798
3799 #[test]
3800 fn test_from_seed_keeps_scheme_urls() {
3801 let node =
3802 ClusterNode::from_seed("rediss://secure.example.com:7005").expect("node should parse");
3803 assert_eq!(node.host, "rediss://secure.example.com:7005");
3804 assert_eq!(node.port, 7005);
3805 }
3806
3807 #[test]
3808 fn test_to_url_aws_elasticache_hostname() {
3809 let node = ClusterNode {
3810 host: "rediss://my-cluster.use1.cache.amazonaws.com".to_string(),
3811 port: 6379,
3812 };
3813 assert_eq!(
3814 node.to_url(),
3815 "rediss://my-cluster.use1.cache.amazonaws.com:6379"
3816 );
3817 }
3818
3819 #[test]
3820 fn test_to_url_with_ipv6_no_port() {
3821 let node = ClusterNode {
3822 host: "rediss://[::1]".to_string(),
3823 port: 6379,
3824 };
3825 assert_eq!(node.to_url(), "rediss://[::1]:6379");
3826 }
3827
3828 #[test]
3829 fn test_to_url_with_ipv6_and_port_in_url() {
3830 let node = ClusterNode {
3831 host: "rediss://[::1]:7000".to_string(),
3832 port: 6379,
3833 };
3834 assert_eq!(node.to_url(), "rediss://[::1]:7000");
3835 }
3836
3837 #[test]
3838 fn test_to_url_with_ipv6_full_address_no_port() {
3839 let node = ClusterNode {
3840 host: "rediss://[2001:db8::1]".to_string(),
3841 port: 6379,
3842 };
3843 assert_eq!(node.to_url(), "rediss://[2001:db8::1]:6379");
3844 }
3845
3846 #[test]
3847 fn test_to_url_with_ipv6_full_address_with_port() {
3848 let node = ClusterNode {
3849 host: "rediss://[2001:db8::1]:7000".to_string(),
3850 port: 6379,
3851 };
3852 assert_eq!(node.to_url(), "rediss://[2001:db8::1]:7000");
3853 }
3854
3855 #[test]
3856 fn test_to_url_with_redis_protocol_ipv6() {
3857 let node = ClusterNode {
3858 host: "redis://[::1]".to_string(),
3859 port: 6379,
3860 };
3861 assert_eq!(node.to_url(), "redis://[::1]:6379");
3862 }
3863}
3864
3865#[cfg(test)]
3866mod cors_config_tests {
3867 use super::CorsConfig;
3868
3869 fn cors_from_json(json: &str) -> sonic_rs::Result<CorsConfig> {
3870 sonic_rs::from_str(json)
3871 }
3872
3873 #[test]
3874 fn test_deserialize_valid_exact_origins() {
3875 let config =
3876 cors_from_json(r#"{"origin": ["https://example.com", "https://other.com"]}"#).unwrap();
3877 assert_eq!(config.origin.len(), 2);
3878 }
3879
3880 #[test]
3881 fn test_deserialize_valid_wildcard_patterns() {
3882 let config =
3883 cors_from_json(r#"{"origin": ["*.example.com", "https://*.staging.example.com"]}"#)
3884 .unwrap();
3885 assert_eq!(config.origin.len(), 2);
3886 }
3887
3888 #[test]
3889 fn test_deserialize_allows_special_markers() {
3890 assert!(cors_from_json(r#"{"origin": ["*"]}"#).is_ok());
3891 assert!(cors_from_json(r#"{"origin": ["Any"]}"#).is_ok());
3892 assert!(cors_from_json(r#"{"origin": ["any"]}"#).is_ok());
3893 assert!(cors_from_json(r#"{"origin": ["*", "https://example.com"]}"#).is_ok());
3894 }
3895
3896 #[test]
3897 fn test_deserialize_rejects_invalid_patterns() {
3898 assert!(cors_from_json(r#"{"origin": ["*.*bad"]}"#).is_err());
3899 assert!(cors_from_json(r#"{"origin": ["*."]}"#).is_err());
3900 assert!(cors_from_json(r#"{"origin": [""]}"#).is_err());
3901 assert!(cors_from_json(r#"{"origin": ["https://"]}"#).is_err());
3902 }
3903
3904 #[test]
3905 fn test_deserialize_rejects_mixed_valid_and_invalid() {
3906 assert!(cors_from_json(r#"{"origin": ["https://good.com", "*.*bad"]}"#).is_err());
3907 }
3908}