1use crate::app::App;
2use crate::utils::{parse_bool_env, parse_env, parse_env_optional};
3use ahash::AHashMap;
4use serde::{Deserialize, Serialize};
5use std::str::FromStr;
6use tracing::{info, warn};
7use url::Url;
8
9fn deserialize_octal_permission<'de, D>(deserializer: D) -> Result<u32, D::Error>
11where
12 D: serde::Deserializer<'de>,
13{
14 use serde::de::{self};
15
16 let s = String::deserialize(deserializer)?;
17
18 if !s.chars().all(|c| c.is_digit(8)) {
20 return Err(de::Error::custom(format!(
21 "invalid octal permission mode '{}': must contain only digits 0-7",
22 s
23 )));
24 }
25
26 let mode = u32::from_str_radix(&s, 8)
28 .map_err(|_| de::Error::custom(format!("invalid octal permission mode: {}", s)))?;
29
30 if mode > 0o777 {
32 return Err(de::Error::custom(format!(
33 "permission mode '{}' exceeds maximum value 777",
34 s
35 )));
36 }
37
38 Ok(mode)
39}
40
41fn parse_driver_enum<T: FromStr + Clone + std::fmt::Debug>(
43 driver_str: String,
44 default_driver: T,
45 driver_name: &str,
46) -> T
47where
48 <T as FromStr>::Err: std::fmt::Debug,
49{
50 match T::from_str(&driver_str.to_lowercase()) {
51 Ok(driver_enum) => driver_enum,
52 Err(e) => {
53 warn!(
54 "Failed to parse {} driver from string '{}': {:?}. Using default: {:?}.",
55 driver_name, driver_str, e, default_driver
56 );
57 default_driver
58 }
59 }
60}
61
62fn override_db_pool_settings(db_conn: &mut DatabaseConnection, prefix: &str) {
63 if let Some(min) = parse_env_optional::<u32>(&format!("{}_POOL_MIN", prefix)) {
64 db_conn.pool_min = Some(min);
65 }
66 if let Some(max) = parse_env_optional::<u32>(&format!("{}_POOL_MAX", prefix)) {
67 db_conn.pool_max = Some(max);
68 }
69}
70
71#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
74#[serde(rename_all = "lowercase")]
75pub enum AdapterDriver {
76 #[default]
77 Local,
78 Redis,
79 #[serde(rename = "redis-cluster")]
80 RedisCluster,
81 Nats,
82 Pulsar,
83 RabbitMq,
84 #[serde(rename = "google-pubsub")]
85 GooglePubSub,
86 Kafka,
87 Iggy,
88}
89
90#[derive(Debug, Clone, Serialize, Deserialize)]
91#[serde(default)]
92pub struct DynamoDbSettings {
93 pub region: String,
94 pub table_name: String,
95 pub endpoint_url: Option<String>,
96 pub aws_access_key_id: Option<String>,
97 pub aws_secret_access_key: Option<String>,
98 pub aws_profile_name: Option<String>,
99}
100
101impl Default for DynamoDbSettings {
102 fn default() -> Self {
103 Self {
104 region: "us-east-1".to_string(),
105 table_name: "sockudo-applications".to_string(),
106 endpoint_url: None,
107 aws_access_key_id: None,
108 aws_secret_access_key: None,
109 aws_profile_name: None,
110 }
111 }
112}
113
114#[derive(Debug, Clone, Serialize, Deserialize)]
115#[serde(default)]
116pub struct ScyllaDbSettings {
117 pub nodes: Vec<String>,
118 pub keyspace: String,
119 pub table_name: String,
120 pub username: Option<String>,
121 pub password: Option<String>,
122 pub replication_class: String,
123 pub replication_factor: u32,
124}
125
126impl Default for ScyllaDbSettings {
127 fn default() -> Self {
128 Self {
129 nodes: vec!["127.0.0.1:9042".to_string()],
130 keyspace: "sockudo".to_string(),
131 table_name: "applications".to_string(),
132 username: None,
133 password: None,
134 replication_class: "SimpleStrategy".to_string(),
135 replication_factor: 3,
136 }
137 }
138}
139
140#[derive(Debug, Clone, Serialize, Deserialize)]
141#[serde(default)]
142pub struct SurrealDbSettings {
143 pub url: String,
144 pub namespace: String,
145 pub database: String,
146 pub username: String,
147 pub password: String,
148 pub table_name: String,
149 pub cache_ttl: u64,
150 pub cache_max_capacity: u64,
151}
152
153impl Default for SurrealDbSettings {
154 fn default() -> Self {
155 Self {
156 url: "ws://127.0.0.1:8000".to_string(),
157 namespace: "sockudo".to_string(),
158 database: "sockudo".to_string(),
159 username: "root".to_string(),
160 password: "root".to_string(),
161 table_name: "applications".to_string(),
162 cache_ttl: 300,
163 cache_max_capacity: 100,
164 }
165 }
166}
167
168impl FromStr for AdapterDriver {
169 type Err = String;
170 fn from_str(s: &str) -> Result<Self, Self::Err> {
171 match s.to_lowercase().as_str() {
172 "local" => Ok(AdapterDriver::Local),
173 "redis" => Ok(AdapterDriver::Redis),
174 "redis-cluster" => Ok(AdapterDriver::RedisCluster),
175 "nats" => Ok(AdapterDriver::Nats),
176 "pulsar" => Ok(AdapterDriver::Pulsar),
177 "rabbitmq" | "rabbit-mq" => Ok(AdapterDriver::RabbitMq),
178 "google-pubsub" | "gcp-pubsub" | "pubsub" => Ok(AdapterDriver::GooglePubSub),
179 "kafka" => Ok(AdapterDriver::Kafka),
180 "iggy" | "apache-iggy" | "apache_iggy" => Ok(AdapterDriver::Iggy),
181 _ => Err(format!("Unknown adapter driver: {s}")),
182 }
183 }
184}
185
186#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
187#[serde(rename_all = "lowercase")]
188pub enum AppManagerDriver {
189 #[default]
190 Memory,
191 Mysql,
192 Dynamodb,
193 PgSql,
194 SurrealDb,
195 ScyllaDb,
196}
197impl FromStr for AppManagerDriver {
198 type Err = String;
199 fn from_str(s: &str) -> Result<Self, Self::Err> {
200 match s.to_lowercase().as_str() {
201 "memory" => Ok(AppManagerDriver::Memory),
202 "mysql" => Ok(AppManagerDriver::Mysql),
203 "dynamodb" => Ok(AppManagerDriver::Dynamodb),
204 "pgsql" | "postgres" | "postgresql" => Ok(AppManagerDriver::PgSql),
205 "surreal" | "surrealdb" => Ok(AppManagerDriver::SurrealDb),
206 "scylladb" | "scylla" => Ok(AppManagerDriver::ScyllaDb),
207 _ => Err(format!("Unknown app manager driver: {s}")),
208 }
209 }
210}
211
212#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
213#[serde(rename_all = "lowercase")]
214pub enum CacheDriver {
215 #[default]
216 Memory,
217 Redis,
218 #[serde(rename = "redis-cluster")]
219 RedisCluster,
220 None,
221}
222
223impl FromStr for CacheDriver {
224 type Err = String;
225 fn from_str(s: &str) -> Result<Self, Self::Err> {
226 match s.to_lowercase().as_str() {
227 "memory" => Ok(CacheDriver::Memory),
228 "redis" => Ok(CacheDriver::Redis),
229 "redis-cluster" => Ok(CacheDriver::RedisCluster),
230 "none" => Ok(CacheDriver::None),
231 _ => Err(format!("Unknown cache driver: {s}")),
232 }
233 }
234}
235
236#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
237#[serde(rename_all = "lowercase")]
238pub enum QueueDriver {
239 Memory,
240 #[default]
241 Redis,
242 #[serde(rename = "redis-cluster")]
243 RedisCluster,
244 Nats,
245 Pulsar,
246 RabbitMq,
247 #[serde(rename = "google-pubsub")]
248 GooglePubSub,
249 Kafka,
250 Iggy,
251 Sqs,
252 Sns,
253 None,
254}
255
256impl FromStr for QueueDriver {
257 type Err = String;
258 fn from_str(s: &str) -> Result<Self, Self::Err> {
259 match s.to_lowercase().as_str() {
260 "memory" => Ok(QueueDriver::Memory),
261 "redis" => Ok(QueueDriver::Redis),
262 "redis-cluster" => Ok(QueueDriver::RedisCluster),
263 "nats" => Ok(QueueDriver::Nats),
264 "pulsar" => Ok(QueueDriver::Pulsar),
265 "rabbitmq" | "rabbit-mq" => Ok(QueueDriver::RabbitMq),
266 "google-pubsub" | "gcp-pubsub" | "pubsub" => Ok(QueueDriver::GooglePubSub),
267 "kafka" => Ok(QueueDriver::Kafka),
268 "iggy" | "apache-iggy" | "apache_iggy" => Ok(QueueDriver::Iggy),
269 "sqs" => Ok(QueueDriver::Sqs),
270 "sns" => Ok(QueueDriver::Sns),
271 "none" => Ok(QueueDriver::None),
272 _ => Err(format!("Unknown queue driver: {s}")),
273 }
274 }
275}
276
277#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
278#[serde(rename_all = "snake_case")]
279pub enum DeltaCoordinationBackend {
280 #[default]
281 Auto,
282 None,
283 Redis,
284 RedisCluster,
285 Nats,
286}
287
288impl FromStr for DeltaCoordinationBackend {
289 type Err = String;
290
291 fn from_str(s: &str) -> Result<Self, Self::Err> {
292 match s.trim().to_ascii_lowercase().as_str() {
293 "auto" => Ok(Self::Auto),
294 "none" => Ok(Self::None),
295 "redis" => Ok(Self::Redis),
296 "redis-cluster" | "redis_cluster" => Ok(Self::RedisCluster),
297 "nats" => Ok(Self::Nats),
298 _ => Err(format!("Unknown delta coordination backend: {s}")),
299 }
300 }
301}
302
303impl AsRef<str> for QueueDriver {
304 fn as_ref(&self) -> &str {
305 match self {
306 QueueDriver::Memory => "memory",
307 QueueDriver::Redis => "redis",
308 QueueDriver::RedisCluster => "redis-cluster",
309 QueueDriver::Nats => "nats",
310 QueueDriver::Pulsar => "pulsar",
311 QueueDriver::RabbitMq => "rabbitmq",
312 QueueDriver::GooglePubSub => "google-pubsub",
313 QueueDriver::Kafka => "kafka",
314 QueueDriver::Iggy => "iggy",
315 QueueDriver::Sqs => "sqs",
316 QueueDriver::Sns => "sns",
317 QueueDriver::None => "none",
318 }
319 }
320}
321
322#[derive(Debug, Clone, Serialize, Deserialize)]
323#[serde(default)]
324pub struct RedisClusterQueueConfig {
325 pub concurrency: u32,
326 pub prefix: Option<String>,
327 pub nodes: Vec<String>,
328 pub request_timeout_ms: u64,
329}
330
331impl Default for RedisClusterQueueConfig {
332 fn default() -> Self {
333 Self {
334 concurrency: 5,
335 prefix: Some("sockudo_queue:".to_string()),
336 nodes: vec!["redis://127.0.0.1:6379".to_string()],
337 request_timeout_ms: 5000,
338 }
339 }
340}
341
342#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
343#[serde(rename_all = "lowercase")]
344pub enum MetricsDriver {
345 #[default]
346 Prometheus,
347}
348
349#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
350#[serde(rename_all = "lowercase")]
351pub enum LogOutputFormat {
352 #[default]
353 Human,
354 Json,
355}
356
357impl FromStr for LogOutputFormat {
358 type Err = String;
359 fn from_str(s: &str) -> Result<Self, Self::Err> {
360 match s.to_lowercase().as_str() {
361 "human" => Ok(LogOutputFormat::Human),
362 "json" => Ok(LogOutputFormat::Json),
363 _ => Err(format!("Unknown log output format: {s}")),
364 }
365 }
366}
367
368impl FromStr for MetricsDriver {
369 type Err = String;
370 fn from_str(s: &str) -> Result<Self, Self::Err> {
371 match s.to_lowercase().as_str() {
372 "prometheus" => Ok(MetricsDriver::Prometheus),
373 _ => Err(format!("Unknown metrics driver: {s}")),
374 }
375 }
376}
377
378impl AsRef<str> for MetricsDriver {
379 fn as_ref(&self) -> &str {
380 match self {
381 MetricsDriver::Prometheus => "prometheus",
382 }
383 }
384}
385
386#[derive(Debug, Clone, Serialize, Deserialize)]
388#[serde(default)]
389pub struct ServerOptions {
390 pub adapter: AdapterConfig,
391 pub app_manager: AppManagerConfig,
392 pub cache: CacheConfig,
393 pub channel_limits: ChannelLimits,
394 pub cors: CorsConfig,
395 pub database: DatabaseConfig,
396 pub database_pooling: DatabasePooling,
397 pub debug: bool,
398 pub event_limits: EventLimits,
399 pub host: String,
400 pub http_api: HttpApiConfig,
401 pub instance: InstanceConfig,
402 pub logging: Option<LoggingConfig>,
403 pub metrics: MetricsConfig,
404 pub mode: String,
405 pub port: u16,
406 pub path_prefix: String,
407 pub presence: PresenceConfig,
408 pub queue: QueueConfig,
409 pub rate_limiter: RateLimiterConfig,
410 pub shutdown_grace_period: u64,
411 pub ssl: SslConfig,
412 pub user_authentication_timeout: u64,
413 pub webhooks: WebhooksConfig,
414 pub websocket_max_payload_kb: u32,
415 pub cleanup: CleanupConfig,
416 pub activity_timeout: u64,
417 pub cluster_health: ClusterHealthConfig,
418 pub unix_socket: UnixSocketConfig,
419 pub delta_compression: DeltaCompressionOptionsConfig,
420 pub tag_filtering: TagFilteringConfig,
421 pub websocket: WebSocketConfig,
422 pub connection_recovery: ConnectionRecoveryConfig,
423 pub history: HistoryConfig,
424 pub presence_history: PresenceHistoryConfig,
425 pub idempotency: IdempotencyConfig,
426 pub ephemeral: EphemeralConfig,
427 pub echo_control: EchoControlConfig,
428 pub event_name_filtering: EventNameFilteringConfig,
429 pub versioned_messages: VersionedMessagesConfig,
430 pub annotations: AnnotationsConfig,
431 pub push: PushConfig,
432 pub health_check_timeout_ms: u64,
435}
436
437#[derive(Debug, Clone, Serialize, Deserialize)]
440#[serde(default)]
441pub struct SqsQueueConfig {
442 pub region: String,
443 pub queue_url_prefix: Option<String>,
444 pub visibility_timeout: i32,
445 pub endpoint_url: Option<String>,
446 pub max_messages: i32,
447 pub wait_time_seconds: i32,
448 pub concurrency: u32,
449 pub fifo: bool,
450 pub message_group_id: Option<String>,
451}
452
453#[derive(Debug, Clone, Serialize, Deserialize)]
454#[serde(default)]
455pub struct SnsQueueConfig {
456 pub region: String,
457 pub topic_arn: String,
458 pub endpoint_url: Option<String>,
459}
460
461#[derive(Debug, Clone, Serialize, Deserialize)]
462#[serde(default)]
463pub struct AdapterConfig {
464 pub driver: AdapterDriver,
465 pub redis: RedisAdapterConfig,
466 pub cluster: RedisClusterAdapterConfig,
467 pub nats: NatsAdapterConfig,
468 pub pulsar: PulsarAdapterConfig,
469 pub rabbitmq: RabbitMqAdapterConfig,
470 pub google_pubsub: GooglePubSubAdapterConfig,
471 pub kafka: KafkaAdapterConfig,
472 pub iggy: IggyConfig,
473 #[serde(default = "default_buffer_multiplier_per_cpu")]
474 pub buffer_multiplier_per_cpu: usize,
475 pub cluster_health: ClusterHealthConfig,
476 #[serde(default = "default_enable_socket_counting")]
477 pub enable_socket_counting: bool,
478 #[serde(default = "default_fallback_to_local")]
479 pub fallback_to_local: bool,
480}
481
482fn default_enable_socket_counting() -> bool {
483 true
484}
485
486fn default_fallback_to_local() -> bool {
487 true
488}
489
490fn default_buffer_multiplier_per_cpu() -> usize {
491 64
492}
493
494impl Default for AdapterConfig {
495 fn default() -> Self {
496 Self {
497 driver: AdapterDriver::default(),
498 redis: RedisAdapterConfig::default(),
499 cluster: RedisClusterAdapterConfig::default(),
500 nats: NatsAdapterConfig::default(),
501 pulsar: PulsarAdapterConfig::default(),
502 rabbitmq: RabbitMqAdapterConfig::default(),
503 google_pubsub: GooglePubSubAdapterConfig::default(),
504 kafka: KafkaAdapterConfig::default(),
505 iggy: IggyConfig::default(),
506 buffer_multiplier_per_cpu: default_buffer_multiplier_per_cpu(),
507 cluster_health: ClusterHealthConfig::default(),
508 enable_socket_counting: default_enable_socket_counting(),
509 fallback_to_local: default_fallback_to_local(),
510 }
511 }
512}
513
514#[derive(Debug, Clone, Serialize, Deserialize)]
515#[serde(default)]
516pub struct RedisAdapterConfig {
517 pub requests_timeout: u64,
518 pub prefix: String,
519 pub redis_pub_options: AHashMap<String, sonic_rs::Value>,
520 pub redis_sub_options: AHashMap<String, sonic_rs::Value>,
521 pub cluster_mode: bool,
522}
523
524#[derive(Debug, Clone, Serialize, Deserialize)]
525#[serde(default)]
526pub struct RedisClusterAdapterConfig {
527 pub nodes: Vec<String>,
528 pub prefix: String,
529 pub request_timeout_ms: u64,
530 pub use_connection_manager: bool,
531 #[serde(default)]
532 pub use_sharded_pubsub: bool,
533}
534
535#[derive(Debug, Clone, Serialize, Deserialize)]
536#[serde(default)]
537pub struct NatsAdapterConfig {
538 pub servers: Vec<String>,
539 pub prefix: String,
540 pub request_timeout_ms: u64,
541 pub username: Option<String>,
542 pub password: Option<String>,
543 pub token: Option<String>,
544 pub connection_timeout_ms: u64,
545 pub nodes_number: Option<u32>,
546 pub discovery_max_wait_ms: u64,
547 pub discovery_idle_wait_ms: u64,
548 pub subscription_capacity: Option<usize>,
549 pub client_capacity: Option<usize>,
550 pub max_reconnects: Option<usize>,
551 pub presence_sync_chunk_size: Option<usize>,
552}
553
554#[derive(Debug, Clone, Serialize, Deserialize)]
555#[serde(default)]
556pub struct PulsarAdapterConfig {
557 pub url: String,
558 pub prefix: String,
559 pub request_timeout_ms: u64,
560 pub token: Option<String>,
561 pub nodes_number: Option<u32>,
562}
563
564#[derive(Debug, Clone, Serialize, Deserialize)]
565#[serde(default)]
566pub struct RabbitMqAdapterConfig {
567 pub url: String,
568 pub prefix: String,
569 pub request_timeout_ms: u64,
570 pub connection_timeout_ms: u64,
571 pub nodes_number: Option<u32>,
572}
573
574#[derive(Debug, Clone, Serialize, Deserialize)]
575#[serde(default)]
576pub struct GooglePubSubAdapterConfig {
577 pub project_id: String,
578 pub prefix: String,
579 pub request_timeout_ms: u64,
580 pub emulator_host: Option<String>,
581 pub nodes_number: Option<u32>,
582}
583
584#[derive(Debug, Clone, Serialize, Deserialize)]
585#[serde(default)]
586pub struct KafkaAdapterConfig {
587 pub brokers: Vec<String>,
588 pub prefix: String,
589 pub request_timeout_ms: u64,
590 pub security_protocol: Option<String>,
591 pub sasl_mechanism: Option<String>,
592 pub sasl_username: Option<String>,
593 pub sasl_password: Option<String>,
594 pub nodes_number: Option<u32>,
595}
596
597#[derive(Debug, Clone, Serialize, Deserialize)]
598#[serde(default)]
599pub struct IggyConfig {
600 pub connection_string: String,
601 pub username: Option<String>,
602 pub password: Option<String>,
603 pub consumer_name: Option<String>,
604 pub stream: String,
605 pub topic_prefix: String,
606 pub queue_topic_prefix: String,
607 pub consumer_group_prefix: String,
608 pub request_timeout_ms: u64,
609 pub poll_interval_ms: u64,
610 pub poll_batch_size: u32,
611 pub partitions_count: u32,
612 pub partition_id: u32,
613 pub auto_create: bool,
614 pub start_from_latest: bool,
615 pub nodes_number: Option<u32>,
616}
617
618#[derive(Debug, Clone, Serialize, Deserialize, Default)]
619#[serde(default)]
620pub struct AppManagerConfig {
621 pub driver: AppManagerDriver,
622 pub array: ArrayConfig,
623 pub cache: CacheSettings,
624 pub scylladb: ScyllaDbSettings,
625}
626
627#[derive(Debug, Clone, Serialize, Deserialize, Default)]
628#[serde(default)]
629pub struct ArrayConfig {
630 pub apps: Vec<App>,
631}
632
633#[derive(Debug, Clone, Serialize, Deserialize)]
634#[serde(default)]
635pub struct CacheSettings {
636 pub enabled: bool,
637 pub ttl: u64,
638}
639
640#[derive(Debug, Clone, Serialize, Deserialize)]
641#[serde(default)]
642pub struct MemoryCacheOptions {
643 pub ttl: u64,
644 pub cleanup_interval: u64,
645 pub max_capacity: u64,
646}
647
648#[derive(Debug, Clone, Serialize, Deserialize)]
649#[serde(default)]
650pub struct CacheConfig {
651 pub driver: CacheDriver,
652 pub redis: RedisConfig,
653 pub memory: MemoryCacheOptions,
654}
655
656#[derive(Debug, Clone, Serialize, Deserialize, Default)]
657#[serde(default)]
658pub struct RedisConfig {
659 pub prefix: Option<String>,
660 pub url_override: Option<String>,
661 pub cluster_mode: bool,
662}
663
664#[derive(Debug, Clone, Serialize, Deserialize)]
665#[serde(default)]
666pub struct ChannelLimits {
667 pub max_name_length: u32,
668 pub cache_ttl: u64,
669}
670
671#[derive(Debug, Clone, Serialize, Deserialize)]
672#[serde(default)]
673pub struct CorsConfig {
674 pub credentials: bool,
675 #[serde(deserialize_with = "deserialize_and_validate_cors_origins")]
676 pub origin: Vec<String>,
677 pub methods: Vec<String>,
678 pub allowed_headers: Vec<String>,
679}
680
681fn deserialize_and_validate_cors_origins<'de, D>(deserializer: D) -> Result<Vec<String>, D::Error>
682where
683 D: serde::Deserializer<'de>,
684{
685 use serde::de::Error;
686 let origins = Vec::<String>::deserialize(deserializer)?;
687
688 if let Err(e) = crate::origin_validation::OriginValidator::validate_patterns(&origins) {
689 return Err(D::Error::custom(format!(
690 "CORS origin pattern validation failed: {}",
691 e
692 )));
693 }
694
695 Ok(origins)
696}
697
698#[derive(Debug, Clone, Serialize, Deserialize, Default)]
699#[serde(default)]
700pub struct DatabaseConfig {
701 pub mysql: DatabaseConnection,
702 pub postgres: DatabaseConnection,
703 pub redis: RedisConnection,
704 pub dynamodb: DynamoDbSettings,
705 pub surrealdb: SurrealDbSettings,
706 pub scylladb: ScyllaDbSettings,
707}
708
709#[derive(Debug, Clone, Serialize, Deserialize)]
710#[serde(default)]
711pub struct DatabaseConnection {
712 pub host: String,
713 pub port: u16,
714 pub username: String,
715 pub password: String,
716 pub database: String,
717 pub table_name: String,
718 pub connection_pool_size: u32,
719 pub pool_min: Option<u32>,
720 pub pool_max: Option<u32>,
721 pub cache_ttl: u64,
722 pub cache_cleanup_interval: u64,
723 pub cache_max_capacity: u64,
724}
725
726#[derive(Debug, Clone, Serialize, Deserialize)]
727#[serde(default)]
728pub struct RedisConnection {
729 pub host: String,
730 pub port: u16,
731 pub db: u32,
732 pub username: Option<String>,
733 pub password: Option<String>,
734 pub key_prefix: String,
735 pub sentinels: Vec<RedisSentinel>,
736 pub sentinel_password: Option<String>,
737 pub name: String,
738 pub cluster: RedisClusterConnection,
739 pub cluster_nodes: Vec<ClusterNode>,
741}
742
743#[derive(Debug, Clone, Serialize, Deserialize)]
744#[serde(default)]
745pub struct RedisSentinel {
746 pub host: String,
747 pub port: u16,
748}
749
750#[derive(Debug, Clone, Serialize, Deserialize, Default)]
751#[serde(default)]
752pub struct RedisClusterConnection {
753 pub nodes: Vec<ClusterNode>,
754 pub username: Option<String>,
755 pub password: Option<String>,
756 #[serde(alias = "useTLS")]
757 pub use_tls: bool,
758}
759
760impl RedisConnection {
761 pub fn is_sentinel_configured(&self) -> bool {
763 !self.sentinels.is_empty()
764 }
765
766 pub fn to_url(&self) -> String {
768 if self.is_sentinel_configured() {
769 self.build_sentinel_url()
770 } else {
771 self.build_standard_url()
772 }
773 }
774
775 fn build_standard_url(&self) -> String {
776 let (scheme, host) = if self.host.starts_with("rediss://") {
778 ("rediss://", self.host.trim_start_matches("rediss://"))
779 } else if self.host.starts_with("redis://") {
780 ("redis://", self.host.trim_start_matches("redis://"))
781 } else {
782 ("redis://", self.host.as_str())
783 };
784
785 let mut url = String::from(scheme);
786
787 if let Some(ref username) = self.username {
788 url.push_str(username);
789 if let Some(ref password) = self.password {
790 url.push(':');
791 url.push_str(&urlencoding::encode(password));
792 }
793 url.push('@');
794 } else if let Some(ref password) = self.password {
795 url.push(':');
796 url.push_str(&urlencoding::encode(password));
797 url.push('@');
798 }
799
800 url.push_str(host);
801 url.push(':');
802 url.push_str(&self.port.to_string());
803 url.push('/');
804 url.push_str(&self.db.to_string());
805
806 url
807 }
808
809 fn build_sentinel_url(&self) -> String {
810 let mut url = String::from("redis+sentinel://");
811
812 if let Some(ref sentinel_password) = self.sentinel_password {
813 url.push(':');
814 url.push_str(&urlencoding::encode(sentinel_password));
815 url.push('@');
816 }
817
818 let sentinel_hosts: Vec<String> = self
819 .sentinels
820 .iter()
821 .map(|s| format!("{}:{}", s.host, s.port))
822 .collect();
823 url.push_str(&sentinel_hosts.join(","));
824
825 url.push('/');
826 url.push_str(&self.name);
827 url.push('/');
828 url.push_str(&self.db.to_string());
829
830 let mut params = Vec::new();
831 if let Some(ref password) = self.password {
832 params.push(format!("password={}", urlencoding::encode(password)));
833 }
834 if let Some(ref username) = self.username {
835 params.push(format!("username={}", urlencoding::encode(username)));
836 }
837
838 if !params.is_empty() {
839 url.push('?');
840 url.push_str(¶ms.join("&"));
841 }
842
843 url
844 }
845
846 pub fn has_cluster_nodes(&self) -> bool {
849 !self.cluster.nodes.is_empty() || !self.cluster_nodes.is_empty()
850 }
851
852 pub fn cluster_node_urls(&self) -> Vec<String> {
855 if !self.cluster.nodes.is_empty() {
856 return self.build_cluster_urls(&self.cluster.nodes);
857 }
858 self.build_cluster_urls(&self.cluster_nodes)
859 }
860
861 pub fn normalize_cluster_seed_urls(&self, seeds: &[String]) -> Vec<String> {
864 self.build_cluster_urls(
865 &seeds
866 .iter()
867 .filter_map(|seed| ClusterNode::from_seed(seed))
868 .collect::<Vec<ClusterNode>>(),
869 )
870 }
871
872 fn build_cluster_urls(&self, nodes: &[ClusterNode]) -> Vec<String> {
873 let username = self
874 .cluster
875 .username
876 .as_deref()
877 .or(self.username.as_deref());
878 let password = self
879 .cluster
880 .password
881 .as_deref()
882 .or(self.password.as_deref());
883 let use_tls = self.cluster.use_tls;
884
885 nodes
886 .iter()
887 .map(|node| node.to_url_with_options(use_tls, username, password))
888 .collect()
889 }
890}
891
892impl RedisSentinel {
893 pub fn to_host_port(&self) -> String {
894 format!("{}:{}", self.host, self.port)
895 }
896}
897
898#[derive(Debug, Clone, Serialize, Deserialize)]
899#[serde(default)]
900pub struct ClusterNode {
901 pub host: String,
902 pub port: u16,
903}
904
905impl ClusterNode {
906 pub fn to_url(&self) -> String {
907 self.to_url_with_options(false, None, None)
908 }
909
910 pub fn to_url_with_options(
911 &self,
912 use_tls: bool,
913 username: Option<&str>,
914 password: Option<&str>,
915 ) -> String {
916 let host = self.host.trim();
917
918 if host.starts_with("redis://") || host.starts_with("rediss://") {
919 if let Ok(parsed) = Url::parse(host)
920 && let Some(host_str) = parsed.host_str()
921 {
922 let scheme = parsed.scheme();
923 let port = parsed.port_or_known_default().unwrap_or(self.port);
924 let parsed_username = (!parsed.username().is_empty()).then_some(parsed.username());
925 let parsed_password = parsed.password();
926 let has_embedded_auth = parsed_username.is_some() || parsed_password.is_some();
927 let (effective_username, effective_password) = if has_embedded_auth {
928 (parsed_username, parsed_password)
929 } else {
930 (username, password)
931 };
932
933 return build_redis_url(
934 scheme,
935 host_str,
936 port,
937 effective_username,
938 effective_password,
939 );
940 }
941
942 let has_port = if let Some(bracket_pos) = host.rfind(']') {
944 host[bracket_pos..].contains(':')
945 } else {
946 host.split(':').count() >= 3
947 };
948 let base = if has_port {
949 host.to_string()
950 } else {
951 format!("{}:{}", host, self.port)
952 };
953
954 if let Ok(parsed) = Url::parse(&base) {
955 let parsed_username = (!parsed.username().is_empty()).then_some(parsed.username());
956 let parsed_password = parsed.password();
957 if let Some(host_str) = parsed.host_str() {
958 let port = parsed.port_or_known_default().unwrap_or(self.port);
959 let has_embedded_auth = parsed_username.is_some() || parsed_password.is_some();
960 let (effective_username, effective_password) = if has_embedded_auth {
961 (parsed_username, parsed_password)
962 } else {
963 (username, password)
964 };
965 return build_redis_url(
966 parsed.scheme(),
967 host_str,
968 port,
969 effective_username,
970 effective_password,
971 );
972 }
973 }
974 return base;
975 }
976
977 let (normalized_host, normalized_port) = split_plain_host_and_port(host, self.port);
978 let scheme = if use_tls { "rediss" } else { "redis" };
979 build_redis_url(
980 scheme,
981 &normalized_host,
982 normalized_port,
983 username,
984 password,
985 )
986 }
987
988 pub fn from_seed(seed: &str) -> Option<Self> {
989 let trimmed = seed.trim();
990 if trimmed.is_empty() {
991 return None;
992 }
993
994 if trimmed.starts_with("redis://") || trimmed.starts_with("rediss://") {
995 let port = Url::parse(trimmed)
996 .ok()
997 .and_then(|parsed| parsed.port_or_known_default())
998 .unwrap_or(6379);
999 return Some(Self {
1000 host: trimmed.to_string(),
1001 port,
1002 });
1003 }
1004
1005 let (host, port) = split_plain_host_and_port(trimmed, 6379);
1006 Some(Self { host, port })
1007 }
1008}
1009
1010fn split_plain_host_and_port(raw_host: &str, default_port: u16) -> (String, u16) {
1011 let host = raw_host.trim();
1012
1013 if host.starts_with('[') {
1015 if let Some(end_bracket) = host.find(']') {
1016 let host_part = host[1..end_bracket].to_string();
1017 let remainder = &host[end_bracket + 1..];
1018 if let Some(port_str) = remainder.strip_prefix(':')
1019 && let Ok(port) = port_str.parse::<u16>()
1020 {
1021 return (host_part, port);
1022 }
1023 return (host_part, default_port);
1024 }
1025 return (host.to_string(), default_port);
1026 }
1027
1028 if host.matches(':').count() == 1
1030 && let Some((host_part, port_part)) = host.rsplit_once(':')
1031 && let Ok(port) = port_part.parse::<u16>()
1032 {
1033 return (host_part.to_string(), port);
1034 }
1035
1036 (host.to_string(), default_port)
1037}
1038
1039fn build_redis_url(
1040 scheme: &str,
1041 host: &str,
1042 port: u16,
1043 username: Option<&str>,
1044 password: Option<&str>,
1045) -> String {
1046 let mut url = format!("{scheme}://");
1047
1048 if let Some(user) = username {
1049 url.push_str(&urlencoding::encode(user));
1050 if let Some(pass) = password {
1051 url.push(':');
1052 url.push_str(&urlencoding::encode(pass));
1053 }
1054 url.push('@');
1055 } else if let Some(pass) = password {
1056 url.push(':');
1057 url.push_str(&urlencoding::encode(pass));
1058 url.push('@');
1059 }
1060
1061 if host.contains(':') && !host.starts_with('[') {
1062 url.push('[');
1063 url.push_str(host);
1064 url.push(']');
1065 } else {
1066 url.push_str(host);
1067 }
1068 url.push(':');
1069 url.push_str(&port.to_string());
1070 url
1071}
1072
1073#[derive(Debug, Clone, Serialize, Deserialize)]
1074#[serde(default)]
1075pub struct DatabasePooling {
1076 pub enabled: bool,
1077 pub min: u32,
1078 pub max: u32,
1079}
1080
1081#[derive(Debug, Clone, Serialize, Deserialize)]
1082#[serde(default)]
1083pub struct EventLimits {
1084 pub max_channels_at_once: u32,
1085 pub max_name_length: u32,
1086 pub max_payload_in_kb: u32,
1087 pub max_batch_size: u32,
1088}
1089
1090#[derive(Debug, Clone, Serialize, Deserialize)]
1091#[serde(default)]
1092pub struct IdempotencyConfig {
1093 pub enabled: bool,
1095 pub ttl_seconds: u64,
1097 pub max_key_length: usize,
1099}
1100
1101#[derive(Debug, Clone, Serialize, Deserialize)]
1102#[serde(default)]
1103pub struct EphemeralConfig {
1104 pub enabled: bool,
1106}
1107
1108#[derive(Debug, Clone, Serialize, Deserialize)]
1109#[serde(default)]
1110pub struct EchoControlConfig {
1111 pub enabled: bool,
1113 pub default_echo_messages: bool,
1115}
1116
1117#[derive(Debug, Clone, Serialize, Deserialize)]
1118#[serde(default)]
1119pub struct EventNameFilteringConfig {
1120 pub enabled: bool,
1122 pub max_events_per_filter: usize,
1124 pub max_event_name_length: usize,
1126}
1127
1128#[derive(Debug, Clone, Serialize, Deserialize)]
1129#[serde(default)]
1130pub struct ConnectionRecoveryConfig {
1131 pub enabled: bool,
1136 pub buffer_ttl_seconds: u64,
1138 pub max_buffer_size: usize,
1140}
1141
1142#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
1143#[serde(rename_all = "lowercase")]
1144pub enum VersionStoreDriver {
1145 #[default]
1146 Memory,
1147 Postgres,
1148 Mysql,
1149 DynamoDb,
1150 ScyllaDb,
1151 SurrealDb,
1152}
1153
1154impl FromStr for VersionStoreDriver {
1155 type Err = String;
1156 fn from_str(s: &str) -> Result<Self, Self::Err> {
1157 match s.to_lowercase().as_str() {
1158 "postgres" | "postgresql" | "pgsql" => Ok(Self::Postgres),
1159 "mysql" => Ok(Self::Mysql),
1160 "dynamodb" => Ok(Self::DynamoDb),
1161 "scylladb" | "scylla" => Ok(Self::ScyllaDb),
1162 "surrealdb" | "surreal" => Ok(Self::SurrealDb),
1163 "memory" => Ok(Self::Memory),
1164 _ => Err(format!("Unknown version store driver: {s}")),
1165 }
1166 }
1167}
1168
1169#[derive(Debug, Clone, Serialize, Deserialize)]
1170#[serde(default)]
1171pub struct VersionedMessagesConfig {
1172 pub enabled: bool,
1174 pub driver: VersionStoreDriver,
1176 pub max_page_size: usize,
1178 pub retention_window_seconds: u64,
1186 pub purge_interval_seconds: u64,
1189 pub purge_batch_size: usize,
1193 pub max_purge_per_tick: usize,
1196}
1197
1198#[derive(Debug, Clone, Serialize, Deserialize, Default)]
1199#[serde(default)]
1200pub struct AnnotationsConfig {
1201 pub enabled: bool,
1204}
1205
1206#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
1207#[serde(rename_all = "lowercase")]
1208pub enum HistoryBackend {
1209 #[default]
1210 Postgres,
1211 Mysql,
1212 DynamoDb,
1213 SurrealDb,
1214 ScyllaDb,
1215 Memory,
1216}
1217
1218impl FromStr for HistoryBackend {
1219 type Err = String;
1220 fn from_str(s: &str) -> Result<Self, Self::Err> {
1221 match s.to_lowercase().as_str() {
1222 "postgres" | "postgresql" | "pgsql" => Ok(Self::Postgres),
1223 "mysql" => Ok(Self::Mysql),
1224 "dynamodb" => Ok(Self::DynamoDb),
1225 "surrealdb" | "surreal" => Ok(Self::SurrealDb),
1226 "scylladb" | "scylla" => Ok(Self::ScyllaDb),
1227 "memory" => Ok(Self::Memory),
1228 _ => Err(format!("Unknown history backend: {s}")),
1229 }
1230 }
1231}
1232
1233#[derive(Debug, Clone, Serialize, Deserialize)]
1234#[serde(default)]
1235pub struct PostgresHistoryConfig {
1236 pub table_prefix: String,
1237 pub write_timeout_ms: u64,
1238}
1239
1240impl Default for PostgresHistoryConfig {
1241 fn default() -> Self {
1242 Self {
1243 table_prefix: "sockudo_history".to_string(),
1244 write_timeout_ms: 5000,
1245 }
1246 }
1247}
1248
1249#[derive(Debug, Clone, Serialize, Deserialize)]
1250#[serde(default)]
1251pub struct MySqlHistoryConfig {
1252 pub table_prefix: String,
1253 pub write_timeout_ms: u64,
1254}
1255
1256impl Default for MySqlHistoryConfig {
1257 fn default() -> Self {
1258 Self {
1259 table_prefix: "sockudo_history".to_string(),
1260 write_timeout_ms: 5000,
1261 }
1262 }
1263}
1264
1265#[derive(Debug, Clone, Serialize, Deserialize)]
1266#[serde(default)]
1267pub struct DynamoDbHistoryConfig {
1268 pub table_prefix: String,
1269 pub write_timeout_ms: u64,
1270}
1271
1272impl Default for DynamoDbHistoryConfig {
1273 fn default() -> Self {
1274 Self {
1275 table_prefix: "sockudo_history".to_string(),
1276 write_timeout_ms: 5000,
1277 }
1278 }
1279}
1280
1281#[derive(Debug, Clone, Serialize, Deserialize)]
1282#[serde(default)]
1283pub struct SurrealDbHistoryConfig {
1284 pub table_prefix: String,
1285 pub write_timeout_ms: u64,
1286}
1287
1288impl Default for SurrealDbHistoryConfig {
1289 fn default() -> Self {
1290 Self {
1291 table_prefix: "sockudo_history".to_string(),
1292 write_timeout_ms: 5000,
1293 }
1294 }
1295}
1296
1297#[derive(Debug, Clone, Serialize, Deserialize)]
1298#[serde(default)]
1299pub struct ScyllaDbHistoryConfig {
1300 pub table_prefix: String,
1301 pub write_timeout_ms: u64,
1302}
1303
1304impl Default for ScyllaDbHistoryConfig {
1305 fn default() -> Self {
1306 Self {
1307 table_prefix: "sockudo_history".to_string(),
1308 write_timeout_ms: 5000,
1309 }
1310 }
1311}
1312
1313#[derive(Debug, Clone, Serialize, Deserialize)]
1314#[serde(default)]
1315pub struct HistoryConfig {
1316 pub enabled: bool,
1317 pub rewind_enabled: bool,
1318 pub backend: HistoryBackend,
1319 pub retention_window_seconds: u64,
1320 pub max_page_size: usize,
1321 pub max_messages_per_channel: Option<usize>,
1322 pub max_bytes_per_channel: Option<u64>,
1323 pub writer_shards: usize,
1324 pub writer_queue_capacity: usize,
1325 pub purge_interval_seconds: u64,
1326 pub purge_batch_size: usize,
1327 pub max_purge_per_tick: usize,
1328 pub postgres: PostgresHistoryConfig,
1329 pub mysql: MySqlHistoryConfig,
1330 pub dynamodb: DynamoDbHistoryConfig,
1331 pub surrealdb: SurrealDbHistoryConfig,
1332 pub scylladb: ScyllaDbHistoryConfig,
1333}
1334
1335impl Default for HistoryConfig {
1336 fn default() -> Self {
1337 Self {
1338 enabled: false,
1339 rewind_enabled: true,
1340 backend: HistoryBackend::Postgres,
1341 retention_window_seconds: 86400,
1342 max_page_size: 100,
1343 max_messages_per_channel: None,
1344 max_bytes_per_channel: None,
1345 writer_shards: 16,
1346 writer_queue_capacity: 4096,
1347 purge_interval_seconds: 300,
1348 purge_batch_size: 1000,
1349 max_purge_per_tick: 100_000,
1350 postgres: PostgresHistoryConfig::default(),
1351 mysql: MySqlHistoryConfig::default(),
1352 dynamodb: DynamoDbHistoryConfig::default(),
1353 surrealdb: SurrealDbHistoryConfig::default(),
1354 scylladb: ScyllaDbHistoryConfig::default(),
1355 }
1356 }
1357}
1358
1359impl Default for VersionedMessagesConfig {
1360 fn default() -> Self {
1361 Self {
1362 enabled: false,
1363 driver: VersionStoreDriver::Memory,
1364 max_page_size: 100,
1365 retention_window_seconds: 0,
1366 purge_interval_seconds: 300,
1367 purge_batch_size: 1000,
1368 max_purge_per_tick: 100_000,
1369 }
1370 }
1371}
1372
1373#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
1374#[serde(rename_all = "lowercase")]
1375pub enum PushStorageDriver {
1376 #[default]
1377 Memory,
1378 Postgres,
1379 Mysql,
1380 DynamoDb,
1381 SurrealDb,
1382 ScyllaDb,
1383}
1384
1385impl FromStr for PushStorageDriver {
1386 type Err = String;
1387 fn from_str(s: &str) -> Result<Self, Self::Err> {
1388 match s.to_lowercase().as_str() {
1389 "memory" => Ok(Self::Memory),
1390 "postgres" | "postgresql" | "pgsql" => Ok(Self::Postgres),
1391 "mysql" => Ok(Self::Mysql),
1392 "dynamodb" => Ok(Self::DynamoDb),
1393 "surrealdb" | "surreal" => Ok(Self::SurrealDb),
1394 "scylladb" | "scylla" => Ok(Self::ScyllaDb),
1395 _ => Err(format!("Unknown push storage driver: {s}")),
1396 }
1397 }
1398}
1399
1400#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
1401#[serde(rename_all = "lowercase")]
1402pub enum PushQueueDriver {
1403 #[default]
1404 Memory,
1405 Redis,
1406 #[serde(rename = "redis-cluster")]
1407 RedisCluster,
1408 Nats,
1409 Pulsar,
1410 RabbitMq,
1411 #[serde(rename = "google-pubsub")]
1412 GooglePubsub,
1413 Kafka,
1414 Iggy,
1415 Sqs,
1416 Sns,
1417}
1418
1419impl FromStr for PushQueueDriver {
1420 type Err = String;
1421 fn from_str(s: &str) -> Result<Self, Self::Err> {
1422 match s.to_lowercase().as_str() {
1423 "memory" => Ok(Self::Memory),
1424 "redis" => Ok(Self::Redis),
1425 "redis-cluster" | "redis_cluster" => Ok(Self::RedisCluster),
1426 "nats" => Ok(Self::Nats),
1427 "pulsar" => Ok(Self::Pulsar),
1428 "rabbitmq" | "rabbit-mq" => Ok(Self::RabbitMq),
1429 "google-pubsub" | "google_pubsub" | "gcp-pubsub" | "pubsub" => Ok(Self::GooglePubsub),
1430 "kafka" => Ok(Self::Kafka),
1431 "iggy" | "apache-iggy" | "apache_iggy" => Ok(Self::Iggy),
1432 "sqs" => Ok(Self::Sqs),
1433 "sns" => Ok(Self::Sns),
1434 _ => Err(format!("Unknown push queue driver: {s}")),
1435 }
1436 }
1437}
1438
1439#[derive(Debug, Clone, Serialize, Deserialize)]
1440#[serde(default)]
1441pub struct PushConfig {
1442 pub storage_driver: PushStorageDriver,
1443 pub queue_driver: PushQueueDriver,
1444 pub fcm_enabled: bool,
1445 pub apns_enabled: bool,
1446 pub webpush_enabled: bool,
1447 pub hms_enabled: bool,
1448 pub wns_enabled: bool,
1449 pub fcm_credential_ref: Option<String>,
1450 pub apns_credential_ref: Option<String>,
1451 pub webpush_credential_ref: Option<String>,
1452 pub hms_credential_ref: Option<String>,
1453 pub wns_credential_ref: Option<String>,
1454 pub accept_worker_count: u32,
1455 pub planner_worker_count: u32,
1456 pub shard_worker_count: u32,
1457 pub dispatch_worker_count: u32,
1458 pub feedback_worker_count: u32,
1459 pub queue_partition_count: u32,
1460 pub channel_shard_count: u32,
1461 pub fanout_fast_threshold: u64,
1462 pub fanout_shard_size: u64,
1463 pub fanout_sync_threshold: u64,
1464 pub backpressure_lag_threshold_secs: u64,
1465 pub publish_status_ttl_days: u64,
1466 pub stale_device_max_age_days: u64,
1467 pub retry: PushRetryConfig,
1468 pub circuit_breaker: PushCircuitBreakerConfig,
1469 pub default_quotas: PushDefaultQuotas,
1470 pub credential_encryption_key: Option<String>,
1471 pub kms_key_ref: Option<String>,
1472 pub vault_secret_ref: Option<String>,
1473 pub dry_run: bool,
1474 pub analytics_enabled: bool,
1475 pub analytics_retention_days: u64,
1476 pub payload_redaction: PushPayloadRedactionConfig,
1477 pub scheduler_interval_secs: u64,
1478}
1479
1480impl Default for PushConfig {
1481 fn default() -> Self {
1482 Self {
1483 storage_driver: PushStorageDriver::Memory,
1484 queue_driver: PushQueueDriver::Memory,
1485 fcm_enabled: false,
1486 apns_enabled: false,
1487 webpush_enabled: false,
1488 hms_enabled: false,
1489 wns_enabled: false,
1490 fcm_credential_ref: None,
1491 apns_credential_ref: None,
1492 webpush_credential_ref: None,
1493 hms_credential_ref: None,
1494 wns_credential_ref: None,
1495 accept_worker_count: 1,
1496 planner_worker_count: 1,
1497 shard_worker_count: 1,
1498 dispatch_worker_count: 1,
1499 feedback_worker_count: 1,
1500 queue_partition_count: 1,
1501 channel_shard_count: 1,
1502 fanout_fast_threshold: 10_000,
1503 fanout_shard_size: 100_000,
1504 fanout_sync_threshold: 0,
1505 backpressure_lag_threshold_secs: 60,
1506 publish_status_ttl_days: 30,
1507 stale_device_max_age_days: 90,
1508 retry: PushRetryConfig::default(),
1509 circuit_breaker: PushCircuitBreakerConfig::default(),
1510 default_quotas: PushDefaultQuotas::default(),
1511 credential_encryption_key: None,
1512 kms_key_ref: None,
1513 vault_secret_ref: None,
1514 dry_run: false,
1515 analytics_enabled: false,
1516 analytics_retention_days: 30,
1517 payload_redaction: PushPayloadRedactionConfig::default(),
1518 scheduler_interval_secs: 5,
1519 }
1520 }
1521}
1522
1523#[derive(Debug, Clone, Serialize, Deserialize)]
1524#[serde(default)]
1525pub struct PushRetryConfig {
1526 pub max_attempts: u32,
1527 pub initial_backoff_ms: u64,
1528 pub max_backoff_ms: u64,
1529 pub max_elapsed_secs: u64,
1530 pub jitter: bool,
1531 pub respect_retry_after: bool,
1532}
1533
1534impl Default for PushRetryConfig {
1535 fn default() -> Self {
1536 Self {
1537 max_attempts: 5,
1538 initial_backoff_ms: 1_000,
1539 max_backoff_ms: 60_000,
1540 max_elapsed_secs: 86_400,
1541 jitter: true,
1542 respect_retry_after: true,
1543 }
1544 }
1545}
1546
1547#[derive(Debug, Clone, Serialize, Deserialize)]
1548#[serde(default)]
1549pub struct PushCircuitBreakerConfig {
1550 pub failure_threshold: u32,
1551 pub cooldown_secs: u64,
1552 pub half_open_max_inflight: u32,
1553}
1554
1555impl Default for PushCircuitBreakerConfig {
1556 fn default() -> Self {
1557 Self {
1558 failure_threshold: 5,
1559 cooldown_secs: 60,
1560 half_open_max_inflight: 10,
1561 }
1562 }
1563}
1564
1565#[derive(Debug, Clone, Serialize, Deserialize)]
1566#[serde(default)]
1567pub struct PushDefaultQuotas {
1568 pub acceptance_rps: u64,
1569 pub delivery_quota_daily: u64,
1570 pub fanout_max: u64,
1571 pub inflight_max: u64,
1572}
1573
1574impl Default for PushDefaultQuotas {
1575 fn default() -> Self {
1576 Self {
1577 acceptance_rps: 100,
1578 delivery_quota_daily: 0,
1579 fanout_max: 0,
1580 inflight_max: 1_000,
1581 }
1582 }
1583}
1584
1585#[derive(Debug, Clone, Serialize, Deserialize)]
1586#[serde(default)]
1587pub struct PushPayloadRedactionConfig {
1588 pub redact_payload: bool,
1589 pub redact_template_data: bool,
1590 pub redact_provider_overrides: bool,
1591 pub allow_debug_payload_logging: bool,
1592}
1593
1594impl Default for PushPayloadRedactionConfig {
1595 fn default() -> Self {
1596 Self {
1597 redact_payload: true,
1598 redact_template_data: true,
1599 redact_provider_overrides: true,
1600 allow_debug_payload_logging: false,
1601 }
1602 }
1603}
1604
1605#[derive(Debug, Clone, Serialize, Deserialize)]
1606#[serde(default)]
1607pub struct PresenceHistoryConfig {
1608 pub enabled: bool,
1609 pub retention_window_seconds: u64,
1610 pub max_page_size: usize,
1611 pub max_events_per_channel: Option<usize>,
1612 pub max_bytes_per_channel: Option<u64>,
1613}
1614
1615impl Default for PresenceHistoryConfig {
1616 fn default() -> Self {
1617 Self {
1618 enabled: false,
1619 retention_window_seconds: 86400,
1620 max_page_size: 100,
1621 max_events_per_channel: None,
1622 max_bytes_per_channel: None,
1623 }
1624 }
1625}
1626
1627#[derive(Debug, Clone, Serialize, Deserialize)]
1628#[serde(default)]
1629pub struct HttpApiConfig {
1630 pub request_limit_in_mb: u32,
1631 pub accept_traffic: AcceptTraffic,
1632 pub usage_enabled: bool,
1633}
1634
1635#[derive(Debug, Clone, Serialize, Deserialize)]
1636#[serde(default)]
1637pub struct AcceptTraffic {
1638 pub memory_threshold: f64,
1639}
1640
1641#[derive(Debug, Clone, Serialize, Deserialize)]
1642#[serde(default)]
1643pub struct InstanceConfig {
1644 pub process_id: String,
1645}
1646
1647#[derive(Debug, Clone, Serialize, Deserialize)]
1648#[serde(default)]
1649pub struct MetricsConfig {
1650 pub enabled: bool,
1651 pub driver: MetricsDriver,
1652 pub host: String,
1653 pub prometheus: PrometheusConfig,
1654 pub tcp_exporter: MetricsTcpExporterConfig,
1655 pub port: u16,
1656}
1657
1658#[derive(Debug, Clone, Serialize, Deserialize)]
1659#[serde(default)]
1660pub struct PrometheusConfig {
1661 pub prefix: String,
1662}
1663
1664#[derive(Debug, Clone, Serialize, Deserialize)]
1665#[serde(default)]
1666pub struct MetricsTcpExporterConfig {
1667 pub enabled: bool,
1668 pub host: String,
1669 pub port: u16,
1670 pub buffer_size: Option<usize>,
1671}
1672
1673#[derive(Debug, Clone, Serialize, Deserialize)]
1674#[serde(default)]
1675pub struct LoggingConfig {
1676 pub colors_enabled: bool,
1677 pub include_target: bool,
1678}
1679
1680#[derive(Debug, Clone, Serialize, Deserialize)]
1681#[serde(default)]
1682pub struct PresenceConfig {
1683 pub max_members_per_channel: u32,
1684 pub max_member_size_in_kb: u32,
1685}
1686
1687#[derive(Debug, Clone, Serialize, Deserialize)]
1690#[serde(default)]
1691pub struct WebSocketConfig {
1692 pub max_messages: Option<usize>,
1693 pub max_bytes: Option<usize>,
1694 pub disconnect_on_buffer_full: bool,
1695 pub max_message_size: usize,
1696 pub max_frame_size: usize,
1697 pub write_buffer_size: usize,
1698 pub max_backpressure: usize,
1699 pub auto_ping: bool,
1700 pub ping_interval: u32,
1701 pub idle_timeout: u32,
1702 pub compression: String,
1703}
1704
1705impl Default for WebSocketConfig {
1706 fn default() -> Self {
1707 Self {
1708 max_messages: Some(1000),
1709 max_bytes: None,
1710 disconnect_on_buffer_full: true,
1711 max_message_size: 64 * 1024 * 1024,
1712 max_frame_size: 16 * 1024 * 1024,
1713 write_buffer_size: 16 * 1024,
1714 max_backpressure: 1024 * 1024,
1715 auto_ping: true,
1716 ping_interval: 30,
1717 idle_timeout: 120,
1718 compression: "disabled".to_string(),
1719 }
1720 }
1721}
1722
1723impl WebSocketConfig {
1724 pub fn to_buffer_config(&self) -> crate::websocket::WebSocketBufferConfig {
1726 use crate::websocket::{BufferLimit, WebSocketBufferConfig};
1727
1728 let limit = match (self.max_messages, self.max_bytes) {
1729 (Some(messages), Some(bytes)) => BufferLimit::Both { messages, bytes },
1730 (Some(messages), None) => BufferLimit::Messages(messages),
1731 (None, Some(bytes)) => BufferLimit::Bytes(bytes),
1732 (None, None) => BufferLimit::Messages(1000),
1733 };
1734
1735 WebSocketBufferConfig {
1736 limit,
1737 disconnect_on_full: self.disconnect_on_buffer_full,
1738 }
1739 }
1740
1741 pub fn to_sockudo_ws_config(
1743 &self,
1744 websocket_max_payload_kb: u32,
1745 activity_timeout: u64,
1746 ) -> sockudo_ws::Config {
1747 use sockudo_ws::Compression;
1748
1749 let compression = match self.compression.to_lowercase().as_str() {
1750 "dedicated" => Compression::Dedicated,
1751 "shared" => Compression::Shared,
1752 "window256b" => Compression::Window256B,
1753 "window1kb" => Compression::Window1KB,
1754 "window2kb" => Compression::Window2KB,
1755 "window4kb" => Compression::Window4KB,
1756 "window8kb" => Compression::Window8KB,
1757 "window16kb" => Compression::Window16KB,
1758 "window32kb" => Compression::Window32KB,
1759 _ => Compression::Disabled,
1760 };
1761
1762 sockudo_ws::Config::builder()
1763 .max_payload_length(
1764 self.max_bytes
1765 .unwrap_or(websocket_max_payload_kb as usize * 1024),
1766 )
1767 .max_message_size(self.max_message_size)
1768 .max_frame_size(self.max_frame_size)
1769 .write_buffer_size(self.write_buffer_size)
1770 .max_backpressure(self.max_backpressure)
1771 .idle_timeout(self.idle_timeout)
1772 .auto_ping(self.auto_ping)
1773 .ping_interval(self.ping_interval.max((activity_timeout / 2).max(5) as u32))
1774 .compression(compression)
1775 .build()
1776 }
1777}
1778
1779#[derive(Debug, Clone, Serialize, Deserialize, Default)]
1780#[serde(default)]
1781pub struct QueueConfig {
1782 pub driver: QueueDriver,
1783 pub redis: RedisQueueConfig,
1784 pub redis_cluster: RedisClusterQueueConfig,
1785 pub nats: NatsAdapterConfig,
1786 pub pulsar: PulsarAdapterConfig,
1787 pub rabbitmq: RabbitMqAdapterConfig,
1788 pub google_pubsub: GooglePubSubAdapterConfig,
1789 pub kafka: KafkaAdapterConfig,
1790 pub iggy: IggyConfig,
1791 pub sqs: SqsQueueConfig,
1792 pub sns: SnsQueueConfig,
1793}
1794
1795#[derive(Debug, Clone, Serialize, Deserialize)]
1796#[serde(default)]
1797pub struct RedisQueueConfig {
1798 pub concurrency: u32,
1799 pub prefix: Option<String>,
1800 pub url_override: Option<String>,
1801 pub cluster_mode: bool,
1802}
1803
1804#[derive(Clone, Debug, Serialize, Deserialize)]
1805#[serde(default)]
1806pub struct RateLimit {
1807 pub max_requests: u32,
1808 pub window_seconds: u64,
1809 pub identifier: Option<String>,
1810 pub trust_hops: Option<u32>,
1811}
1812
1813#[derive(Debug, Clone, Serialize, Deserialize)]
1814#[serde(default)]
1815pub struct RateLimiterConfig {
1816 pub enabled: bool,
1817 pub driver: CacheDriver,
1818 pub api_rate_limit: RateLimit,
1819 pub websocket_rate_limit: RateLimit,
1820 pub redis: RedisConfig,
1821}
1822
1823#[derive(Debug, Clone, Serialize, Deserialize)]
1824#[serde(default)]
1825pub struct SslConfig {
1826 pub enabled: bool,
1827 pub cert_path: String,
1828 pub key_path: String,
1829 pub passphrase: Option<String>,
1830 pub ca_path: Option<String>,
1831 pub redirect_http: bool,
1832 pub http_port: Option<u16>,
1833}
1834
1835#[derive(Debug, Clone, Serialize, Deserialize)]
1836#[serde(default)]
1837pub struct WebhooksConfig {
1838 pub batching: BatchingConfig,
1839 pub retry: WebhookRetryConfig,
1840 pub request_timeout_ms: u64,
1841}
1842
1843#[derive(Debug, Clone, Serialize, Deserialize)]
1844#[serde(default)]
1845pub struct BatchingConfig {
1846 pub enabled: bool,
1847 pub duration: u64,
1848 pub size: usize,
1849}
1850
1851#[derive(Debug, Clone, Serialize, Deserialize)]
1852#[serde(default)]
1853pub struct WebhookRetryConfig {
1854 pub enabled: bool,
1855 pub max_attempts: Option<u32>,
1856 pub max_elapsed_time_ms: u64,
1857 pub initial_backoff_ms: u64,
1858 pub max_backoff_ms: u64,
1859}
1860
1861impl Default for WebhooksConfig {
1862 fn default() -> Self {
1863 Self {
1864 batching: BatchingConfig::default(),
1865 retry: WebhookRetryConfig::default(),
1866 request_timeout_ms: 10_000,
1867 }
1868 }
1869}
1870
1871impl Default for WebhookRetryConfig {
1872 fn default() -> Self {
1873 Self {
1874 enabled: true,
1875 max_attempts: None,
1876 max_elapsed_time_ms: 300_000,
1877 initial_backoff_ms: 1_000,
1878 max_backoff_ms: 60_000,
1879 }
1880 }
1881}
1882
1883#[derive(Debug, Clone, Serialize, Deserialize)]
1884#[serde(default)]
1885pub struct ClusterHealthConfig {
1886 pub enabled: bool,
1887 pub heartbeat_interval_ms: u64,
1888 pub node_timeout_ms: u64,
1889 pub cleanup_interval_ms: u64,
1890}
1891
1892#[derive(Debug, Clone, Serialize, Deserialize)]
1893#[serde(default)]
1894pub struct UnixSocketConfig {
1895 pub enabled: bool,
1896 pub path: String,
1897 #[serde(deserialize_with = "deserialize_octal_permission")]
1898 pub permission_mode: u32,
1899}
1900
1901#[derive(Debug, Clone, Serialize, Deserialize)]
1902#[serde(default)]
1903pub struct DeltaCompressionOptionsConfig {
1904 pub enabled: bool,
1905 pub algorithm: String,
1906 pub full_message_interval: u32,
1907 pub min_message_size: usize,
1908 pub max_state_age_secs: u64,
1909 pub max_channel_states_per_socket: usize,
1910 pub max_conflation_states_per_channel: Option<usize>,
1911 pub conflation_key_path: Option<String>,
1912 pub cluster_coordination: bool,
1913 pub coordination_backend: DeltaCoordinationBackend,
1914 pub omit_delta_algorithm: bool,
1915}
1916
1917#[derive(Debug, Clone, Serialize, Deserialize)]
1919#[serde(default)]
1920pub struct CleanupConfig {
1921 pub queue_buffer_size: usize,
1922 pub batch_size: usize,
1923 pub batch_timeout_ms: u64,
1924 pub worker_threads: WorkerThreadsConfig,
1925 pub max_retry_attempts: u32,
1926 pub async_enabled: bool,
1927 pub fallback_to_sync: bool,
1928}
1929
1930#[derive(Debug, Clone)]
1932pub enum WorkerThreadsConfig {
1933 Auto,
1934 Fixed(usize),
1935}
1936
1937impl serde::Serialize for WorkerThreadsConfig {
1938 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
1939 where
1940 S: serde::Serializer,
1941 {
1942 match self {
1943 WorkerThreadsConfig::Auto => serializer.serialize_str("auto"),
1944 WorkerThreadsConfig::Fixed(n) => serializer.serialize_u64(*n as u64),
1945 }
1946 }
1947}
1948
1949impl<'de> serde::Deserialize<'de> for WorkerThreadsConfig {
1950 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
1951 where
1952 D: serde::Deserializer<'de>,
1953 {
1954 use serde::de;
1955 struct WorkerThreadsVisitor;
1956 impl<'de> de::Visitor<'de> for WorkerThreadsVisitor {
1957 type Value = WorkerThreadsConfig;
1958
1959 fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
1960 formatter.write_str(r#""auto" or a positive integer"#)
1961 }
1962
1963 fn visit_str<E: de::Error>(self, value: &str) -> Result<Self::Value, E> {
1964 if value.eq_ignore_ascii_case("auto") {
1965 Ok(WorkerThreadsConfig::Auto)
1966 } else if let Ok(n) = value.parse::<usize>() {
1967 Ok(WorkerThreadsConfig::Fixed(n))
1968 } else {
1969 Err(E::custom(format!(
1970 "expected 'auto' or a number, got '{value}'"
1971 )))
1972 }
1973 }
1974
1975 fn visit_u64<E: de::Error>(self, value: u64) -> Result<Self::Value, E> {
1976 Ok(WorkerThreadsConfig::Fixed(value as usize))
1977 }
1978
1979 fn visit_i64<E: de::Error>(self, value: i64) -> Result<Self::Value, E> {
1980 if value >= 0 {
1981 Ok(WorkerThreadsConfig::Fixed(value as usize))
1982 } else {
1983 Err(E::custom("worker_threads must be non-negative"))
1984 }
1985 }
1986 }
1987 deserializer.deserialize_any(WorkerThreadsVisitor)
1988 }
1989}
1990
1991impl Default for CleanupConfig {
1992 fn default() -> Self {
1993 Self {
1994 queue_buffer_size: 1024,
1995 batch_size: 64,
1996 batch_timeout_ms: 100,
1997 worker_threads: WorkerThreadsConfig::Auto,
1998 max_retry_attempts: 3,
1999 async_enabled: true,
2000 fallback_to_sync: true,
2001 }
2002 }
2003}
2004
2005impl CleanupConfig {
2006 pub fn validate(&self) -> Result<(), String> {
2007 if self.queue_buffer_size == 0 {
2008 return Err("queue_buffer_size must be greater than 0".to_string());
2009 }
2010 if self.batch_size == 0 {
2011 return Err("batch_size must be greater than 0".to_string());
2012 }
2013 if self.batch_timeout_ms == 0 {
2014 return Err("batch_timeout_ms must be greater than 0".to_string());
2015 }
2016 if let WorkerThreadsConfig::Fixed(n) = self.worker_threads
2017 && n == 0
2018 {
2019 return Err("worker_threads must be greater than 0 when using fixed count".to_string());
2020 }
2021 Ok(())
2022 }
2023}
2024
2025#[derive(Debug, Clone, Serialize, Deserialize, Default)]
2026#[serde(default)]
2027pub struct TagFilteringConfig {
2028 #[serde(default)]
2029 pub enabled: bool,
2030 #[serde(default = "default_true")]
2031 pub enable_tags: bool,
2032}
2033
2034fn default_true() -> bool {
2035 true
2036}
2037
2038impl Default for ServerOptions {
2041 fn default() -> Self {
2042 Self {
2043 adapter: AdapterConfig::default(),
2044 app_manager: AppManagerConfig::default(),
2045 cache: CacheConfig::default(),
2046 channel_limits: ChannelLimits::default(),
2047 cors: CorsConfig::default(),
2048 database: DatabaseConfig::default(),
2049 database_pooling: DatabasePooling::default(),
2050 debug: false,
2051 tag_filtering: TagFilteringConfig::default(),
2052 event_limits: EventLimits::default(),
2053 host: "0.0.0.0".to_string(),
2054 http_api: HttpApiConfig::default(),
2055 instance: InstanceConfig::default(),
2056 logging: None,
2057 metrics: MetricsConfig::default(),
2058 mode: "production".to_string(),
2059 port: 6001,
2060 path_prefix: "/".to_string(),
2061 presence: PresenceConfig::default(),
2062 queue: QueueConfig::default(),
2063 rate_limiter: RateLimiterConfig::default(),
2064 shutdown_grace_period: 10,
2065 ssl: SslConfig::default(),
2066 user_authentication_timeout: 3600,
2067 webhooks: WebhooksConfig::default(),
2068 websocket_max_payload_kb: 64,
2069 cleanup: CleanupConfig::default(),
2070 activity_timeout: 120,
2071 cluster_health: ClusterHealthConfig::default(),
2072 unix_socket: UnixSocketConfig::default(),
2073 delta_compression: DeltaCompressionOptionsConfig::default(),
2074 websocket: WebSocketConfig::default(),
2075 connection_recovery: ConnectionRecoveryConfig::default(),
2076 history: HistoryConfig::default(),
2077 presence_history: PresenceHistoryConfig::default(),
2078 idempotency: IdempotencyConfig::default(),
2079 ephemeral: EphemeralConfig::default(),
2080 echo_control: EchoControlConfig::default(),
2081 event_name_filtering: EventNameFilteringConfig::default(),
2082 versioned_messages: VersionedMessagesConfig::default(),
2083 annotations: AnnotationsConfig::default(),
2084 push: PushConfig::default(),
2085 health_check_timeout_ms: 400,
2086 }
2087 }
2088}
2089
2090impl Default for SqsQueueConfig {
2091 fn default() -> Self {
2092 Self {
2093 region: "us-east-1".to_string(),
2094 queue_url_prefix: None,
2095 visibility_timeout: 30,
2096 endpoint_url: None,
2097 max_messages: 10,
2098 wait_time_seconds: 5,
2099 concurrency: 5,
2100 fifo: false,
2101 message_group_id: Some("default".to_string()),
2102 }
2103 }
2104}
2105
2106impl Default for SnsQueueConfig {
2107 fn default() -> Self {
2108 Self {
2109 region: "us-east-1".to_string(),
2110 topic_arn: String::new(),
2111 endpoint_url: None,
2112 }
2113 }
2114}
2115
2116impl Default for RedisAdapterConfig {
2117 fn default() -> Self {
2118 Self {
2119 requests_timeout: 5000,
2120 prefix: "sockudo_adapter:".to_string(),
2121 redis_pub_options: AHashMap::new(),
2122 redis_sub_options: AHashMap::new(),
2123 cluster_mode: false,
2124 }
2125 }
2126}
2127
2128impl Default for RedisClusterAdapterConfig {
2129 fn default() -> Self {
2130 Self {
2131 nodes: vec![],
2132 prefix: "sockudo_adapter:".to_string(),
2133 request_timeout_ms: 1000,
2134 use_connection_manager: true,
2135 use_sharded_pubsub: false,
2136 }
2137 }
2138}
2139
2140impl Default for NatsAdapterConfig {
2141 fn default() -> Self {
2142 Self {
2143 servers: vec!["nats://localhost:4222".to_string()],
2144 prefix: "sockudo_adapter:".to_string(),
2145 request_timeout_ms: 5000,
2146 username: None,
2147 password: None,
2148 token: None,
2149 connection_timeout_ms: 5000,
2150 nodes_number: None,
2151 discovery_max_wait_ms: 1000,
2152 discovery_idle_wait_ms: 150,
2153 subscription_capacity: None,
2154 client_capacity: None,
2155 max_reconnects: None,
2156 presence_sync_chunk_size: None,
2157 }
2158 }
2159}
2160
2161impl Default for PulsarAdapterConfig {
2162 fn default() -> Self {
2163 Self {
2164 url: "pulsar://127.0.0.1:6650".to_string(),
2165 prefix: "sockudo-adapter".to_string(),
2166 request_timeout_ms: 5000,
2167 token: None,
2168 nodes_number: None,
2169 }
2170 }
2171}
2172
2173impl Default for RabbitMqAdapterConfig {
2174 fn default() -> Self {
2175 Self {
2176 url: "amqp://guest:guest@127.0.0.1:5672/%2f".to_string(),
2177 prefix: "sockudo_adapter".to_string(),
2178 request_timeout_ms: 5000,
2179 connection_timeout_ms: 5000,
2180 nodes_number: None,
2181 }
2182 }
2183}
2184
2185impl Default for GooglePubSubAdapterConfig {
2186 fn default() -> Self {
2187 Self {
2188 project_id: "".to_string(),
2189 prefix: "sockudo-adapter".to_string(),
2190 request_timeout_ms: 5000,
2191 emulator_host: None,
2192 nodes_number: None,
2193 }
2194 }
2195}
2196
2197impl Default for KafkaAdapterConfig {
2198 fn default() -> Self {
2199 Self {
2200 brokers: vec!["localhost:9092".to_string()],
2201 prefix: "sockudo_adapter".to_string(),
2202 request_timeout_ms: 5000,
2203 security_protocol: None,
2204 sasl_mechanism: None,
2205 sasl_username: None,
2206 sasl_password: None,
2207 nodes_number: None,
2208 }
2209 }
2210}
2211
2212impl Default for IggyConfig {
2213 fn default() -> Self {
2214 Self {
2215 connection_string: "iggy://iggy:iggy@127.0.0.1:8090".to_string(),
2216 username: None,
2217 password: None,
2218 consumer_name: None,
2219 stream: "sockudo".to_string(),
2220 topic_prefix: "sockudo-adapter".to_string(),
2221 queue_topic_prefix: "sockudo-queue".to_string(),
2222 consumer_group_prefix: "sockudo-workers".to_string(),
2223 request_timeout_ms: 5000,
2224 poll_interval_ms: 50,
2225 poll_batch_size: 100,
2226 partitions_count: 1,
2227 partition_id: 0,
2228 auto_create: true,
2229 start_from_latest: true,
2230 nodes_number: None,
2231 }
2232 }
2233}
2234
2235impl Default for CacheSettings {
2236 fn default() -> Self {
2237 Self {
2238 enabled: true,
2239 ttl: 300,
2240 }
2241 }
2242}
2243
2244impl Default for MemoryCacheOptions {
2245 fn default() -> Self {
2246 Self {
2247 ttl: 300,
2248 cleanup_interval: 60,
2249 max_capacity: 10000,
2250 }
2251 }
2252}
2253
2254impl Default for CacheConfig {
2255 fn default() -> Self {
2256 Self {
2257 driver: CacheDriver::default(),
2258 redis: RedisConfig {
2259 prefix: Some("sockudo_cache:".to_string()),
2260 url_override: None,
2261 cluster_mode: false,
2262 },
2263 memory: MemoryCacheOptions::default(),
2264 }
2265 }
2266}
2267
2268impl Default for ChannelLimits {
2269 fn default() -> Self {
2270 Self {
2271 max_name_length: 200,
2272 cache_ttl: 3600,
2273 }
2274 }
2275}
2276impl Default for CorsConfig {
2277 fn default() -> Self {
2278 Self {
2279 credentials: true,
2280 origin: vec!["*".to_string()],
2281 methods: vec!["GET".to_string(), "POST".to_string(), "OPTIONS".to_string()],
2282 allowed_headers: vec![
2283 "Authorization".to_string(),
2284 "Content-Type".to_string(),
2285 "X-Requested-With".to_string(),
2286 "Accept".to_string(),
2287 ],
2288 }
2289 }
2290}
2291
2292impl Default for DatabaseConnection {
2293 fn default() -> Self {
2294 Self {
2295 host: "localhost".to_string(),
2296 port: 3306,
2297 username: "root".to_string(),
2298 password: "".to_string(),
2299 database: "sockudo".to_string(),
2300 table_name: "applications".to_string(),
2301 connection_pool_size: 10,
2302 pool_min: None,
2303 pool_max: None,
2304 cache_ttl: 300,
2305 cache_cleanup_interval: 60,
2306 cache_max_capacity: 100,
2307 }
2308 }
2309}
2310
2311impl Default for RedisConnection {
2312 fn default() -> Self {
2313 Self {
2314 host: "127.0.0.1".to_string(),
2315 port: 6379,
2316 db: 0,
2317 username: None,
2318 password: None,
2319 key_prefix: "sockudo:".to_string(),
2320 sentinels: Vec::new(),
2321 sentinel_password: None,
2322 name: "mymaster".to_string(),
2323 cluster: RedisClusterConnection::default(),
2324 cluster_nodes: Vec::new(),
2325 }
2326 }
2327}
2328
2329impl Default for RedisSentinel {
2330 fn default() -> Self {
2331 Self {
2332 host: "localhost".to_string(),
2333 port: 26379,
2334 }
2335 }
2336}
2337
2338impl Default for ClusterNode {
2339 fn default() -> Self {
2340 Self {
2341 host: "127.0.0.1".to_string(),
2342 port: 7000,
2343 }
2344 }
2345}
2346
2347impl Default for DatabasePooling {
2348 fn default() -> Self {
2349 Self {
2350 enabled: true,
2351 min: 2,
2352 max: 10,
2353 }
2354 }
2355}
2356
2357impl Default for EventLimits {
2358 fn default() -> Self {
2359 Self {
2360 max_channels_at_once: 100,
2361 max_name_length: 200,
2362 max_payload_in_kb: 100,
2363 max_batch_size: 10,
2364 }
2365 }
2366}
2367
2368impl Default for IdempotencyConfig {
2369 fn default() -> Self {
2370 Self {
2371 enabled: true,
2372 ttl_seconds: 120,
2373 max_key_length: 128,
2374 }
2375 }
2376}
2377
2378impl Default for EphemeralConfig {
2379 fn default() -> Self {
2380 Self { enabled: true }
2381 }
2382}
2383
2384impl Default for EchoControlConfig {
2385 fn default() -> Self {
2386 Self {
2387 enabled: true,
2388 default_echo_messages: true,
2389 }
2390 }
2391}
2392
2393impl Default for EventNameFilteringConfig {
2394 fn default() -> Self {
2395 Self {
2396 enabled: true,
2397 max_events_per_filter: 50,
2398 max_event_name_length: 200,
2399 }
2400 }
2401}
2402
2403impl Default for ConnectionRecoveryConfig {
2404 fn default() -> Self {
2405 Self {
2406 enabled: false,
2407 buffer_ttl_seconds: 120,
2408 max_buffer_size: 100,
2409 }
2410 }
2411}
2412
2413impl Default for HttpApiConfig {
2414 fn default() -> Self {
2415 Self {
2416 request_limit_in_mb: 10,
2417 accept_traffic: AcceptTraffic::default(),
2418 usage_enabled: true,
2419 }
2420 }
2421}
2422
2423impl Default for AcceptTraffic {
2424 fn default() -> Self {
2425 Self {
2426 memory_threshold: 0.90,
2427 }
2428 }
2429}
2430
2431impl Default for InstanceConfig {
2432 fn default() -> Self {
2433 Self {
2434 process_id: uuid::Uuid::new_v4().to_string(),
2435 }
2436 }
2437}
2438
2439impl Default for LoggingConfig {
2440 fn default() -> Self {
2441 Self {
2442 colors_enabled: true,
2443 include_target: true,
2444 }
2445 }
2446}
2447
2448impl Default for MetricsConfig {
2449 fn default() -> Self {
2450 Self {
2451 enabled: true,
2452 driver: MetricsDriver::default(),
2453 host: "0.0.0.0".to_string(),
2454 prometheus: PrometheusConfig::default(),
2455 tcp_exporter: MetricsTcpExporterConfig::default(),
2456 port: 9601,
2457 }
2458 }
2459}
2460
2461impl Default for PrometheusConfig {
2462 fn default() -> Self {
2463 Self {
2464 prefix: "sockudo_".to_string(),
2465 }
2466 }
2467}
2468
2469impl Default for MetricsTcpExporterConfig {
2470 fn default() -> Self {
2471 Self {
2472 enabled: false,
2473 host: "127.0.0.1".to_string(),
2474 port: 5000,
2475 buffer_size: Some(1024),
2476 }
2477 }
2478}
2479
2480impl Default for PresenceConfig {
2481 fn default() -> Self {
2482 Self {
2483 max_members_per_channel: 100,
2484 max_member_size_in_kb: 2,
2485 }
2486 }
2487}
2488
2489impl Default for RedisQueueConfig {
2490 fn default() -> Self {
2491 Self {
2492 concurrency: 5,
2493 prefix: Some("sockudo_queue:".to_string()),
2494 url_override: None,
2495 cluster_mode: false,
2496 }
2497 }
2498}
2499
2500impl Default for RateLimit {
2501 fn default() -> Self {
2502 Self {
2503 max_requests: 60,
2504 window_seconds: 60,
2505 identifier: Some("default".to_string()),
2506 trust_hops: Some(0),
2507 }
2508 }
2509}
2510
2511impl Default for RateLimiterConfig {
2512 fn default() -> Self {
2513 Self {
2514 enabled: true,
2515 driver: CacheDriver::Memory,
2516 api_rate_limit: RateLimit {
2517 max_requests: 100,
2518 window_seconds: 60,
2519 identifier: Some("api".to_string()),
2520 trust_hops: Some(0),
2521 },
2522 websocket_rate_limit: RateLimit {
2523 max_requests: 20,
2524 window_seconds: 60,
2525 identifier: Some("websocket_connect".to_string()),
2526 trust_hops: Some(0),
2527 },
2528 redis: RedisConfig {
2529 prefix: Some("sockudo_rl:".to_string()),
2530 url_override: None,
2531 cluster_mode: false,
2532 },
2533 }
2534 }
2535}
2536
2537impl Default for SslConfig {
2538 fn default() -> Self {
2539 Self {
2540 enabled: false,
2541 cert_path: "".to_string(),
2542 key_path: "".to_string(),
2543 passphrase: None,
2544 ca_path: None,
2545 redirect_http: false,
2546 http_port: Some(80),
2547 }
2548 }
2549}
2550
2551impl Default for BatchingConfig {
2552 fn default() -> Self {
2553 Self {
2554 enabled: true,
2555 duration: 50,
2556 size: 100,
2557 }
2558 }
2559}
2560
2561impl Default for ClusterHealthConfig {
2562 fn default() -> Self {
2563 Self {
2564 enabled: true,
2565 heartbeat_interval_ms: 10000,
2566 node_timeout_ms: 30000,
2567 cleanup_interval_ms: 10000,
2568 }
2569 }
2570}
2571
2572impl Default for UnixSocketConfig {
2573 fn default() -> Self {
2574 Self {
2575 enabled: false,
2576 path: "/var/run/sockudo/sockudo.sock".to_string(),
2577 permission_mode: 0o660,
2578 }
2579 }
2580}
2581
2582impl Default for DeltaCompressionOptionsConfig {
2583 fn default() -> Self {
2584 Self {
2585 enabled: true,
2586 algorithm: "fossil".to_string(),
2587 full_message_interval: 10,
2588 min_message_size: 100,
2589 max_state_age_secs: 300,
2590 max_channel_states_per_socket: 100,
2591 max_conflation_states_per_channel: Some(100),
2592 conflation_key_path: None,
2593 cluster_coordination: false,
2594 coordination_backend: DeltaCoordinationBackend::Auto,
2595 omit_delta_algorithm: false,
2596 }
2597 }
2598}
2599
2600#[cfg(test)]
2601mod tests {
2602 use super::{
2603 DeltaCoordinationBackend, PushQueueDriver, PushStorageDriver, QueueDriver, ServerOptions,
2604 VersionStoreDriver,
2605 };
2606 use crate::app::{App, AppPolicy};
2607 use std::str::FromStr;
2608
2609 const APP_BOOTSTRAP_ENV_KEYS: &[&str] = &[
2610 "SOCKUDO_DEFAULT_APP_ID",
2611 "SOCKUDO_DEFAULT_APP_KEY",
2612 "SOCKUDO_DEFAULT_APP_SECRET",
2613 "SOCKUDO_DEFAULT_APP_ENABLED",
2614 "SOCKUDO_SKIP_INLINE_APPS",
2615 "APP_MANAGER_REGISTER_INLINE_APPS",
2616 ];
2617
2618 struct EnvGuard {
2619 previous: Vec<(&'static str, Option<String>)>,
2620 }
2621
2622 impl EnvGuard {
2623 fn app_bootstrap(overrides: &[(&'static str, &'static str)]) -> Self {
2624 let previous = APP_BOOTSTRAP_ENV_KEYS
2625 .iter()
2626 .map(|key| (*key, std::env::var(key).ok()))
2627 .collect();
2628
2629 unsafe {
2632 for key in APP_BOOTSTRAP_ENV_KEYS {
2633 std::env::remove_var(key);
2634 }
2635 for (key, value) in overrides {
2636 std::env::set_var(key, value);
2637 }
2638 }
2639
2640 Self { previous }
2641 }
2642 }
2643
2644 impl Drop for EnvGuard {
2645 fn drop(&mut self) {
2646 unsafe {
2649 for (key, value) in &self.previous {
2650 if let Some(value) = value {
2651 std::env::set_var(key, value);
2652 } else {
2653 std::env::remove_var(key);
2654 }
2655 }
2656 }
2657 }
2658 }
2659
2660 fn inline_test_app() -> App {
2661 App::from_policy(
2662 "app-id".to_string(),
2663 "app-key".to_string(),
2664 "app-secret".to_string(),
2665 true,
2666 AppPolicy::default(),
2667 )
2668 }
2669
2670 #[test]
2671 fn queue_driver_parses_broker_backends() {
2672 assert_eq!(
2673 QueueDriver::from_str("rabbitmq").unwrap(),
2674 QueueDriver::RabbitMq
2675 );
2676 assert_eq!(QueueDriver::from_str("kafka").unwrap(), QueueDriver::Kafka);
2677 assert_eq!(
2678 QueueDriver::from_str("pulsar").unwrap(),
2679 QueueDriver::Pulsar
2680 );
2681 assert_eq!(
2682 QueueDriver::from_str("google-pubsub").unwrap(),
2683 QueueDriver::GooglePubSub
2684 );
2685 }
2686
2687 #[test]
2688 fn delta_coordination_backend_parses_expected_values() {
2689 assert_eq!(
2690 DeltaCoordinationBackend::from_str("auto").unwrap(),
2691 DeltaCoordinationBackend::Auto
2692 );
2693 assert_eq!(
2694 DeltaCoordinationBackend::from_str("redis-cluster").unwrap(),
2695 DeltaCoordinationBackend::RedisCluster
2696 );
2697 assert_eq!(
2698 DeltaCoordinationBackend::from_str("nats").unwrap(),
2699 DeltaCoordinationBackend::Nats
2700 );
2701 }
2702
2703 #[tokio::test]
2704 async fn app_bootstrap_env_overrides_inline_apps() {
2705 {
2706 let _env = EnvGuard::app_bootstrap(&[("SOCKUDO_DEFAULT_APP_ENABLED", "false")]);
2707 let mut options = ServerOptions::default();
2708 options.app_manager.array.apps.push(inline_test_app());
2709
2710 options.override_from_env().await.unwrap();
2711
2712 assert!(options.app_manager.array.apps.is_empty());
2713 }
2714
2715 {
2716 let _env = EnvGuard::app_bootstrap(&[("SOCKUDO_DEFAULT_APP_ENABLED", "true")]);
2717 let mut options = ServerOptions::default();
2718 options.app_manager.array.apps.push(inline_test_app());
2719
2720 options.override_from_env().await.unwrap();
2721
2722 assert_eq!(options.app_manager.array.apps.len(), 1);
2723 assert_eq!(options.app_manager.array.apps[0].id, "app-id");
2724 }
2725
2726 {
2727 let _env = EnvGuard::app_bootstrap(&[
2728 ("SOCKUDO_DEFAULT_APP_ID", "prod-app"),
2729 ("SOCKUDO_DEFAULT_APP_KEY", "prod-key"),
2730 ("SOCKUDO_DEFAULT_APP_SECRET", "prod-secret"),
2731 ("SOCKUDO_DEFAULT_APP_ENABLED", "true"),
2732 ]);
2733 let mut options = ServerOptions::default();
2734 options.app_manager.array.apps.push(inline_test_app());
2735
2736 options.override_from_env().await.unwrap();
2737
2738 assert_eq!(options.app_manager.array.apps.len(), 1);
2739 let app = &options.app_manager.array.apps[0];
2740 assert_eq!(app.id, "prod-app");
2741 assert_eq!(app.key, "prod-key");
2742 assert_eq!(app.secret, "prod-secret");
2743 assert!(app.enabled);
2744 }
2745
2746 {
2747 let _env = EnvGuard::app_bootstrap(&[("APP_MANAGER_REGISTER_INLINE_APPS", "false")]);
2748 let mut options = ServerOptions::default();
2749 options.app_manager.array.apps.push(inline_test_app());
2750
2751 options.override_from_env().await.unwrap();
2752
2753 assert!(options.app_manager.array.apps.is_empty());
2754 }
2755 }
2756
2757 #[tokio::test]
2758 async fn versioned_messages_driver_overrides_from_env() {
2759 let previous = std::env::var("VERSIONED_MESSAGES_DRIVER").ok();
2760 unsafe { std::env::set_var("VERSIONED_MESSAGES_DRIVER", "postgres") };
2763
2764 let mut options = ServerOptions::default();
2765 options.override_from_env().await.unwrap();
2766
2767 if let Some(previous) = previous {
2768 unsafe { std::env::set_var("VERSIONED_MESSAGES_DRIVER", previous) };
2770 } else {
2771 unsafe { std::env::remove_var("VERSIONED_MESSAGES_DRIVER") };
2773 }
2774
2775 assert_eq!(
2776 options.versioned_messages.driver,
2777 VersionStoreDriver::Postgres
2778 );
2779 }
2780
2781 #[tokio::test]
2782 async fn push_storage_driver_overrides_from_env() {
2783 let previous = std::env::var("PUSH_STORAGE_DRIVER").ok();
2784 unsafe { std::env::set_var("PUSH_STORAGE_DRIVER", "mysql") };
2787
2788 let mut options = ServerOptions::default();
2789 options.override_from_env().await.unwrap();
2790
2791 if let Some(previous) = previous {
2792 unsafe { std::env::set_var("PUSH_STORAGE_DRIVER", previous) };
2794 } else {
2795 unsafe { std::env::remove_var("PUSH_STORAGE_DRIVER") };
2797 }
2798
2799 assert_eq!(options.push.storage_driver, PushStorageDriver::Mysql);
2800 }
2801
2802 #[tokio::test]
2803 async fn push_queue_driver_overrides_from_env() {
2804 let previous = std::env::var("PUSH_QUEUE_DRIVER").ok();
2805 unsafe { std::env::set_var("PUSH_QUEUE_DRIVER", "redis-cluster") };
2808
2809 let mut options = ServerOptions::default();
2810 options.override_from_env().await.unwrap();
2811
2812 if let Some(previous) = previous {
2813 unsafe { std::env::set_var("PUSH_QUEUE_DRIVER", previous) };
2815 } else {
2816 unsafe { std::env::remove_var("PUSH_QUEUE_DRIVER") };
2818 }
2819
2820 assert_eq!(options.push.queue_driver, PushQueueDriver::RedisCluster);
2821 }
2822
2823 #[tokio::test]
2824 async fn websocket_rate_limit_trust_hops_overrides_from_env() {
2825 let previous = std::env::var("RATE_LIMITER_WS_TRUST_HOPS").ok();
2826 unsafe { std::env::set_var("RATE_LIMITER_WS_TRUST_HOPS", "2") };
2829
2830 let mut options = ServerOptions::default();
2831 options.override_from_env().await.unwrap();
2832
2833 if let Some(previous) = previous {
2834 unsafe { std::env::set_var("RATE_LIMITER_WS_TRUST_HOPS", previous) };
2836 } else {
2837 unsafe { std::env::remove_var("RATE_LIMITER_WS_TRUST_HOPS") };
2839 }
2840
2841 assert_eq!(
2842 options.rate_limiter.websocket_rate_limit.trust_hops,
2843 Some(2)
2844 );
2845 }
2846
2847 #[test]
2848 fn push_config_defaults_follow_capacity_model() {
2849 let options = ServerOptions::default();
2850
2851 assert_eq!(options.push.storage_driver, PushStorageDriver::Memory);
2852 assert_eq!(options.push.queue_driver, PushQueueDriver::Memory);
2853 assert!(!options.push.fcm_enabled);
2854 assert!(!options.push.apns_enabled);
2855 assert!(!options.push.webpush_enabled);
2856 assert!(!options.push.hms_enabled);
2857 assert!(!options.push.wns_enabled);
2858 assert_eq!(options.push.fanout_fast_threshold, 10_000);
2859 assert_eq!(options.push.fanout_shard_size, 100_000);
2860 assert_eq!(options.push.publish_status_ttl_days, 30);
2861 assert_eq!(options.push.default_quotas.acceptance_rps, 100);
2862 assert_eq!(options.push.circuit_breaker.failure_threshold, 5);
2863 assert!(options.push.payload_redaction.redact_payload);
2864 }
2865
2866 #[tokio::test]
2867 async fn push_release_env_overrides_are_parsed() {
2868 let keys = [
2869 "PUSH_FCM_ENABLED",
2870 "PUSH_APNS_ENABLED",
2871 "PUSH_WEBPUSH_ENABLED",
2872 "PUSH_HMS_ENABLED",
2873 "PUSH_WNS_ENABLED",
2874 "PUSH_CREDENTIAL_ENCRYPTION_KEY",
2875 "PUSH_FANOUT_FAST_THRESHOLD",
2876 "PUSH_FANOUT_SHARD_SIZE",
2877 "PUSH_FANOUT_SYNC_THRESHOLD",
2878 "PUSH_BACKPRESSURE_LAG_THRESHOLD_SECS",
2879 "PUSH_PUBLISH_STATUS_TTL_DAYS",
2880 "PUSH_FAILURE_THRESHOLD",
2881 "PUSH_SCHEDULER_INTERVAL_SECS",
2882 "PUSH_STALE_DEVICE_MAX_AGE_DAYS",
2883 "PUSH_ANALYTICS_ENABLED",
2884 "PUSH_DEFAULT_ACCEPTANCE_RPS",
2885 "PUSH_DEFAULT_DELIVERY_QUOTA_DAILY",
2886 "PUSH_DEFAULT_FANOUT_MAX",
2887 "PUSH_DEFAULT_INFLIGHT_MAX",
2888 ];
2889 let previous: Vec<_> = keys
2890 .iter()
2891 .map(|key| (*key, std::env::var(key).ok()))
2892 .collect();
2893
2894 unsafe {
2897 std::env::set_var("PUSH_FCM_ENABLED", "true");
2898 std::env::set_var("PUSH_APNS_ENABLED", "true");
2899 std::env::set_var("PUSH_WEBPUSH_ENABLED", "true");
2900 std::env::set_var("PUSH_HMS_ENABLED", "true");
2901 std::env::set_var("PUSH_WNS_ENABLED", "true");
2902 std::env::set_var("PUSH_CREDENTIAL_ENCRYPTION_KEY", "env:key:v1");
2903 std::env::set_var("PUSH_FANOUT_FAST_THRESHOLD", "12345");
2904 std::env::set_var("PUSH_FANOUT_SHARD_SIZE", "54321");
2905 std::env::set_var("PUSH_FANOUT_SYNC_THRESHOLD", "250");
2906 std::env::set_var("PUSH_BACKPRESSURE_LAG_THRESHOLD_SECS", "42");
2907 std::env::set_var("PUSH_PUBLISH_STATUS_TTL_DAYS", "14");
2908 std::env::set_var("PUSH_FAILURE_THRESHOLD", "9");
2909 std::env::set_var("PUSH_SCHEDULER_INTERVAL_SECS", "11");
2910 std::env::set_var("PUSH_STALE_DEVICE_MAX_AGE_DAYS", "120");
2911 std::env::set_var("PUSH_ANALYTICS_ENABLED", "true");
2912 std::env::set_var("PUSH_DEFAULT_ACCEPTANCE_RPS", "700");
2913 std::env::set_var("PUSH_DEFAULT_DELIVERY_QUOTA_DAILY", "8000");
2914 std::env::set_var("PUSH_DEFAULT_FANOUT_MAX", "9000");
2915 std::env::set_var("PUSH_DEFAULT_INFLIGHT_MAX", "1000");
2916 }
2917
2918 let mut options = ServerOptions::default();
2919 options.override_from_env().await.unwrap();
2920
2921 for (key, value) in previous {
2922 unsafe {
2925 if let Some(value) = value {
2926 std::env::set_var(key, value);
2927 } else {
2928 std::env::remove_var(key);
2929 }
2930 }
2931 }
2932
2933 assert!(options.push.fcm_enabled);
2934 assert!(options.push.apns_enabled);
2935 assert!(options.push.webpush_enabled);
2936 assert!(options.push.hms_enabled);
2937 assert!(options.push.wns_enabled);
2938 assert_eq!(
2939 options.push.credential_encryption_key.as_deref(),
2940 Some("env:key:v1")
2941 );
2942 assert_eq!(options.push.fanout_fast_threshold, 12_345);
2943 assert_eq!(options.push.fanout_shard_size, 54_321);
2944 assert_eq!(options.push.fanout_sync_threshold, 250);
2945 assert_eq!(options.push.backpressure_lag_threshold_secs, 42);
2946 assert_eq!(options.push.publish_status_ttl_days, 14);
2947 assert_eq!(options.push.circuit_breaker.failure_threshold, 9);
2948 assert_eq!(options.push.scheduler_interval_secs, 11);
2949 assert_eq!(options.push.stale_device_max_age_days, 120);
2950 assert!(options.push.analytics_enabled);
2951 assert_eq!(options.push.default_quotas.acceptance_rps, 700);
2952 assert_eq!(options.push.default_quotas.delivery_quota_daily, 8_000);
2953 assert_eq!(options.push.default_quotas.fanout_max, 9_000);
2954 assert_eq!(options.push.default_quotas.inflight_max, 1_000);
2955 }
2956}
2957
2958impl ClusterHealthConfig {
2959 pub fn validate(&self) -> Result<(), String> {
2960 if self.heartbeat_interval_ms == 0 {
2961 return Err("heartbeat_interval_ms must be greater than 0".to_string());
2962 }
2963 if self.node_timeout_ms == 0 {
2964 return Err("node_timeout_ms must be greater than 0".to_string());
2965 }
2966 if self.cleanup_interval_ms == 0 {
2967 return Err("cleanup_interval_ms must be greater than 0".to_string());
2968 }
2969
2970 if self.heartbeat_interval_ms > self.node_timeout_ms / 3 {
2971 return Err(format!(
2972 "heartbeat_interval_ms ({}) should be at least 3x smaller than node_timeout_ms ({}) to avoid false positive dead node detection. Recommended: heartbeat_interval_ms <= {}",
2973 self.heartbeat_interval_ms,
2974 self.node_timeout_ms,
2975 self.node_timeout_ms / 3
2976 ));
2977 }
2978
2979 if self.cleanup_interval_ms > self.node_timeout_ms {
2980 return Err(format!(
2981 "cleanup_interval_ms ({}) should not be larger than node_timeout_ms ({}) to ensure timely dead node detection",
2982 self.cleanup_interval_ms, self.node_timeout_ms
2983 ));
2984 }
2985
2986 Ok(())
2987 }
2988}
2989
2990impl ServerOptions {
2991 pub async fn load_from_file(path: &str) -> Result<Self, Box<dyn std::error::Error>> {
2992 let content = tokio::fs::read_to_string(path).await?;
2993 let options: Self = if path.ends_with(".toml") {
2994 toml::from_str(&content)?
2995 } else {
2996 sonic_rs::from_str(&content)?
2998 };
2999 Ok(options)
3000 }
3001
3002 pub async fn override_from_env(&mut self) -> Result<(), Box<dyn std::error::Error>> {
3003 if let Ok(mode) = std::env::var("ENVIRONMENT") {
3005 self.mode = mode;
3006 }
3007 self.debug = parse_bool_env("DEBUG_MODE", self.debug);
3008 if parse_bool_env("DEBUG", false) {
3009 self.debug = true;
3010 info!("DEBUG environment variable forces debug mode ON");
3011 }
3012
3013 self.activity_timeout = parse_env::<u64>("ACTIVITY_TIMEOUT", self.activity_timeout);
3014
3015 if let Ok(host) = std::env::var("HOST") {
3016 self.host = host;
3017 }
3018 self.port = parse_env::<u16>("PORT", self.port);
3019 self.shutdown_grace_period =
3020 parse_env::<u64>("SHUTDOWN_GRACE_PERIOD", self.shutdown_grace_period);
3021 self.user_authentication_timeout = parse_env::<u64>(
3022 "USER_AUTHENTICATION_TIMEOUT",
3023 self.user_authentication_timeout,
3024 );
3025 self.websocket_max_payload_kb =
3026 parse_env::<u32>("WEBSOCKET_MAX_PAYLOAD_KB", self.websocket_max_payload_kb);
3027 if let Ok(id) = std::env::var("INSTANCE_PROCESS_ID") {
3028 self.instance.process_id = id;
3029 }
3030
3031 if let Ok(driver_str) = std::env::var("ADAPTER_DRIVER") {
3033 self.adapter.driver =
3034 parse_driver_enum(driver_str, self.adapter.driver.clone(), "Adapter");
3035 }
3036 self.adapter.buffer_multiplier_per_cpu = parse_env::<usize>(
3037 "ADAPTER_BUFFER_MULTIPLIER_PER_CPU",
3038 self.adapter.buffer_multiplier_per_cpu,
3039 );
3040 self.adapter.enable_socket_counting = parse_env::<bool>(
3041 "ADAPTER_ENABLE_SOCKET_COUNTING",
3042 self.adapter.enable_socket_counting,
3043 );
3044 self.adapter.fallback_to_local =
3045 parse_env::<bool>("ADAPTER_FALLBACK_TO_LOCAL", self.adapter.fallback_to_local);
3046 if let Ok(driver_str) = std::env::var("CACHE_DRIVER") {
3047 self.cache.driver = parse_driver_enum(driver_str, self.cache.driver.clone(), "Cache");
3048 }
3049 if let Ok(driver_str) = std::env::var("QUEUE_DRIVER") {
3050 self.queue.driver = parse_driver_enum(driver_str, self.queue.driver.clone(), "Queue");
3051 }
3052 if let Ok(driver_str) = std::env::var("APP_MANAGER_DRIVER") {
3053 self.app_manager.driver =
3054 parse_driver_enum(driver_str, self.app_manager.driver.clone(), "AppManager");
3055 }
3056 if let Ok(driver_str) = std::env::var("RATE_LIMITER_DRIVER") {
3057 self.rate_limiter.driver = parse_driver_enum(
3058 driver_str,
3059 self.rate_limiter.driver.clone(),
3060 "RateLimiter Backend",
3061 );
3062 }
3063
3064 if let Ok(host) = std::env::var("DATABASE_REDIS_HOST") {
3066 self.database.redis.host = host;
3067 }
3068 self.database.redis.port =
3069 parse_env::<u16>("DATABASE_REDIS_PORT", self.database.redis.port);
3070 if let Ok(username) = std::env::var("DATABASE_REDIS_USERNAME") {
3071 self.database.redis.username = if username.is_empty() {
3072 None
3073 } else {
3074 Some(username)
3075 };
3076 }
3077 if let Ok(password) = std::env::var("DATABASE_REDIS_PASSWORD") {
3078 self.database.redis.password = Some(password);
3079 }
3080 self.database.redis.db = parse_env::<u32>("DATABASE_REDIS_DB", self.database.redis.db);
3081 if let Ok(prefix) = std::env::var("DATABASE_REDIS_KEY_PREFIX") {
3082 self.database.redis.key_prefix = prefix;
3083 }
3084 if let Ok(cluster_username) = std::env::var("DATABASE_REDIS_CLUSTER_USERNAME") {
3085 self.database.redis.cluster.username = if cluster_username.is_empty() {
3086 None
3087 } else {
3088 Some(cluster_username)
3089 };
3090 }
3091 if let Ok(cluster_password) = std::env::var("DATABASE_REDIS_CLUSTER_PASSWORD") {
3092 self.database.redis.cluster.password = Some(cluster_password);
3093 }
3094 self.database.redis.cluster.use_tls = parse_bool_env(
3095 "DATABASE_REDIS_CLUSTER_USE_TLS",
3096 self.database.redis.cluster.use_tls,
3097 );
3098
3099 if let Ok(host) = std::env::var("DATABASE_MYSQL_HOST") {
3101 self.database.mysql.host = host;
3102 }
3103 self.database.mysql.port =
3104 parse_env::<u16>("DATABASE_MYSQL_PORT", self.database.mysql.port);
3105 if let Ok(user) = std::env::var("DATABASE_MYSQL_USERNAME") {
3106 self.database.mysql.username = user;
3107 }
3108 if let Ok(pass) = std::env::var("DATABASE_MYSQL_PASSWORD") {
3109 self.database.mysql.password = pass;
3110 }
3111 if let Ok(db) = std::env::var("DATABASE_MYSQL_DATABASE") {
3112 self.database.mysql.database = db;
3113 }
3114 if let Ok(table) = std::env::var("DATABASE_MYSQL_TABLE_NAME") {
3115 self.database.mysql.table_name = table;
3116 }
3117 override_db_pool_settings(&mut self.database.mysql, "DATABASE_MYSQL");
3118
3119 if let Ok(host) = std::env::var("DATABASE_POSTGRES_HOST") {
3121 self.database.postgres.host = host;
3122 }
3123 self.database.postgres.port =
3124 parse_env::<u16>("DATABASE_POSTGRES_PORT", self.database.postgres.port);
3125 if let Ok(user) = std::env::var("DATABASE_POSTGRES_USERNAME") {
3126 self.database.postgres.username = user;
3127 }
3128 if let Ok(pass) = std::env::var("DATABASE_POSTGRES_PASSWORD") {
3129 self.database.postgres.password = pass;
3130 }
3131 if let Ok(db) = std::env::var("DATABASE_POSTGRES_DATABASE") {
3132 self.database.postgres.database = db;
3133 }
3134 override_db_pool_settings(&mut self.database.postgres, "DATABASE_POSTGRES");
3135
3136 if let Ok(region) = std::env::var("DATABASE_DYNAMODB_REGION") {
3138 self.database.dynamodb.region = region;
3139 }
3140 if let Ok(table) = std::env::var("DATABASE_DYNAMODB_TABLE_NAME") {
3141 self.database.dynamodb.table_name = table;
3142 }
3143 if let Ok(endpoint) = std::env::var("DATABASE_DYNAMODB_ENDPOINT_URL") {
3144 self.database.dynamodb.endpoint_url = Some(endpoint);
3145 }
3146 if let Ok(key_id) = std::env::var("AWS_ACCESS_KEY_ID") {
3147 self.database.dynamodb.aws_access_key_id = Some(key_id);
3148 }
3149 if let Ok(secret) = std::env::var("AWS_SECRET_ACCESS_KEY") {
3150 self.database.dynamodb.aws_secret_access_key = Some(secret);
3151 }
3152
3153 if let Ok(url) = std::env::var("DATABASE_SURREALDB_URL") {
3155 self.database.surrealdb.url = url;
3156 }
3157 if let Ok(namespace) = std::env::var("DATABASE_SURREALDB_NAMESPACE") {
3158 self.database.surrealdb.namespace = namespace;
3159 }
3160 if let Ok(database) = std::env::var("DATABASE_SURREALDB_DATABASE") {
3161 self.database.surrealdb.database = database;
3162 }
3163 if let Ok(username) = std::env::var("DATABASE_SURREALDB_USERNAME") {
3164 self.database.surrealdb.username = username;
3165 }
3166 if let Ok(password) = std::env::var("DATABASE_SURREALDB_PASSWORD") {
3167 self.database.surrealdb.password = password;
3168 }
3169 if let Ok(table) = std::env::var("DATABASE_SURREALDB_TABLE_NAME") {
3170 self.database.surrealdb.table_name = table;
3171 }
3172
3173 let apply_redis_cluster_nodes = |options: &mut Self, nodes: &str| {
3175 let node_list: Vec<String> = nodes
3176 .split(',')
3177 .map(|s| s.trim())
3178 .filter(|s| !s.is_empty())
3179 .map(ToString::to_string)
3180 .collect();
3181
3182 options.adapter.cluster.nodes = node_list.clone();
3183 options.queue.redis_cluster.nodes = node_list.clone();
3184
3185 let parsed_nodes: Vec<ClusterNode> = node_list
3186 .iter()
3187 .filter_map(|seed| ClusterNode::from_seed(seed))
3188 .collect();
3189 options.database.redis.cluster.nodes = parsed_nodes.clone();
3190 options.database.redis.cluster_nodes = parsed_nodes;
3191 };
3192
3193 if let Ok(nodes) = std::env::var("REDIS_CLUSTER_NODES") {
3194 apply_redis_cluster_nodes(self, &nodes);
3195 }
3196 if let Ok(nodes) = std::env::var("DATABASE_REDIS_CLUSTER_NODES") {
3197 apply_redis_cluster_nodes(self, &nodes);
3198 }
3199 self.queue.redis_cluster.concurrency = parse_env::<u32>(
3200 "REDIS_CLUSTER_QUEUE_CONCURRENCY",
3201 self.queue.redis_cluster.concurrency,
3202 );
3203 if let Ok(prefix) = std::env::var("REDIS_CLUSTER_QUEUE_PREFIX") {
3204 self.queue.redis_cluster.prefix = Some(prefix);
3205 }
3206
3207 self.ssl.enabled = parse_bool_env("SSL_ENABLED", self.ssl.enabled);
3209 if let Ok(val) = std::env::var("SSL_CERT_PATH") {
3210 self.ssl.cert_path = val;
3211 }
3212 if let Ok(val) = std::env::var("SSL_KEY_PATH") {
3213 self.ssl.key_path = val;
3214 }
3215 self.ssl.redirect_http = parse_bool_env("SSL_REDIRECT_HTTP", self.ssl.redirect_http);
3216 if let Some(port) = parse_env_optional::<u16>("SSL_HTTP_PORT") {
3217 self.ssl.http_port = Some(port);
3218 }
3219
3220 self.unix_socket.enabled = parse_bool_env("UNIX_SOCKET_ENABLED", self.unix_socket.enabled);
3222 if let Ok(path) = std::env::var("UNIX_SOCKET_PATH") {
3223 self.unix_socket.path = path;
3224 }
3225 if let Ok(mode_str) = std::env::var("UNIX_SOCKET_PERMISSION_MODE") {
3226 if mode_str.chars().all(|c| c.is_digit(8)) {
3227 if let Ok(mode) = u32::from_str_radix(&mode_str, 8) {
3228 if mode <= 0o777 {
3229 self.unix_socket.permission_mode = mode;
3230 } else {
3231 warn!(
3232 "UNIX_SOCKET_PERMISSION_MODE '{}' exceeds maximum value 777. Using default: {:o}",
3233 mode_str, self.unix_socket.permission_mode
3234 );
3235 }
3236 } else {
3237 warn!(
3238 "Failed to parse UNIX_SOCKET_PERMISSION_MODE '{}' as octal. Using default: {:o}",
3239 mode_str, self.unix_socket.permission_mode
3240 );
3241 }
3242 } else {
3243 warn!(
3244 "UNIX_SOCKET_PERMISSION_MODE '{}' must contain only octal digits (0-7). Using default: {:o}",
3245 mode_str, self.unix_socket.permission_mode
3246 );
3247 }
3248 }
3249
3250 if let Ok(driver_str) = std::env::var("METRICS_DRIVER") {
3252 self.metrics.driver =
3253 parse_driver_enum(driver_str, self.metrics.driver.clone(), "Metrics");
3254 }
3255 self.metrics.enabled = parse_bool_env("METRICS_ENABLED", self.metrics.enabled);
3256 if let Ok(val) = std::env::var("METRICS_HOST") {
3257 self.metrics.host = val;
3258 }
3259 self.metrics.port = parse_env::<u16>("METRICS_PORT", self.metrics.port);
3260 if let Ok(val) = std::env::var("METRICS_PROMETHEUS_PREFIX") {
3261 self.metrics.prometheus.prefix = val;
3262 }
3263 self.metrics.tcp_exporter.enabled = parse_bool_env(
3264 "METRICS_TCP_EXPORTER_ENABLED",
3265 self.metrics.tcp_exporter.enabled,
3266 );
3267 if let Ok(val) = std::env::var("METRICS_TCP_EXPORTER_HOST") {
3268 self.metrics.tcp_exporter.host = val;
3269 }
3270 self.metrics.tcp_exporter.port =
3271 parse_env::<u16>("METRICS_TCP_EXPORTER_PORT", self.metrics.tcp_exporter.port);
3272 if let Some(buffer_size) = parse_env_optional::<usize>("METRICS_TCP_EXPORTER_BUFFER_SIZE") {
3273 self.metrics.tcp_exporter.buffer_size = Some(buffer_size);
3274 }
3275
3276 self.http_api.usage_enabled =
3278 parse_bool_env("HTTP_API_USAGE_ENABLED", self.http_api.usage_enabled);
3279
3280 self.rate_limiter.enabled =
3282 parse_bool_env("RATE_LIMITER_ENABLED", self.rate_limiter.enabled);
3283 self.rate_limiter.api_rate_limit.max_requests = parse_env::<u32>(
3284 "RATE_LIMITER_API_MAX_REQUESTS",
3285 self.rate_limiter.api_rate_limit.max_requests,
3286 );
3287 self.rate_limiter.api_rate_limit.window_seconds = parse_env::<u64>(
3288 "RATE_LIMITER_API_WINDOW_SECONDS",
3289 self.rate_limiter.api_rate_limit.window_seconds,
3290 );
3291 if let Some(hops) = parse_env_optional::<u32>("RATE_LIMITER_API_TRUST_HOPS") {
3292 self.rate_limiter.api_rate_limit.trust_hops = Some(hops);
3293 }
3294 self.rate_limiter.websocket_rate_limit.max_requests = parse_env::<u32>(
3295 "RATE_LIMITER_WS_MAX_REQUESTS",
3296 self.rate_limiter.websocket_rate_limit.max_requests,
3297 );
3298 self.rate_limiter.websocket_rate_limit.window_seconds = parse_env::<u64>(
3299 "RATE_LIMITER_WS_WINDOW_SECONDS",
3300 self.rate_limiter.websocket_rate_limit.window_seconds,
3301 );
3302 if let Some(hops) = parse_env_optional::<u32>("RATE_LIMITER_WS_TRUST_HOPS") {
3303 self.rate_limiter.websocket_rate_limit.trust_hops = Some(hops);
3304 }
3305 if let Ok(prefix) = std::env::var("RATE_LIMITER_REDIS_PREFIX") {
3306 self.rate_limiter.redis.prefix = Some(prefix);
3307 }
3308
3309 self.queue.redis.concurrency =
3311 parse_env::<u32>("QUEUE_REDIS_CONCURRENCY", self.queue.redis.concurrency);
3312 if let Ok(prefix) = std::env::var("QUEUE_REDIS_PREFIX") {
3313 self.queue.redis.prefix = Some(prefix);
3314 }
3315
3316 if let Ok(region) = std::env::var("QUEUE_SQS_REGION") {
3318 self.queue.sqs.region = region;
3319 }
3320 self.queue.sqs.visibility_timeout = parse_env::<i32>(
3321 "QUEUE_SQS_VISIBILITY_TIMEOUT",
3322 self.queue.sqs.visibility_timeout,
3323 );
3324 self.queue.sqs.max_messages =
3325 parse_env::<i32>("QUEUE_SQS_MAX_MESSAGES", self.queue.sqs.max_messages);
3326 self.queue.sqs.wait_time_seconds = parse_env::<i32>(
3327 "QUEUE_SQS_WAIT_TIME_SECONDS",
3328 self.queue.sqs.wait_time_seconds,
3329 );
3330 self.queue.sqs.concurrency =
3331 parse_env::<u32>("QUEUE_SQS_CONCURRENCY", self.queue.sqs.concurrency);
3332 self.queue.sqs.fifo = parse_bool_env("QUEUE_SQS_FIFO", self.queue.sqs.fifo);
3333 if let Ok(endpoint) = std::env::var("QUEUE_SQS_ENDPOINT_URL") {
3334 self.queue.sqs.endpoint_url = Some(endpoint);
3335 }
3336
3337 if let Ok(region) = std::env::var("QUEUE_SNS_REGION") {
3339 self.queue.sns.region = region;
3340 }
3341 if let Ok(topic_arn) = std::env::var("QUEUE_SNS_TOPIC_ARN") {
3342 self.queue.sns.topic_arn = topic_arn;
3343 }
3344 if let Ok(endpoint) = std::env::var("QUEUE_SNS_ENDPOINT_URL") {
3345 self.queue.sns.endpoint_url = Some(endpoint);
3346 }
3347
3348 self.webhooks.batching.enabled =
3350 parse_bool_env("WEBHOOK_BATCHING_ENABLED", self.webhooks.batching.enabled);
3351 self.webhooks.batching.duration =
3352 parse_env::<u64>("WEBHOOK_BATCHING_DURATION", self.webhooks.batching.duration);
3353 self.webhooks.batching.size =
3354 parse_env::<usize>("WEBHOOK_BATCHING_SIZE", self.webhooks.batching.size);
3355
3356 if let Ok(servers) = std::env::var("NATS_SERVERS") {
3358 self.adapter.nats.servers = servers.split(',').map(|s| s.trim().to_string()).collect();
3359 }
3360 if let Ok(user) = std::env::var("NATS_USERNAME") {
3361 self.adapter.nats.username = Some(user);
3362 }
3363 if let Ok(pass) = std::env::var("NATS_PASSWORD") {
3364 self.adapter.nats.password = Some(pass);
3365 }
3366 if let Ok(token) = std::env::var("NATS_TOKEN") {
3367 self.adapter.nats.token = Some(token);
3368 }
3369 if let Ok(prefix) = std::env::var("NATS_PREFIX") {
3370 self.adapter.nats.prefix = prefix;
3371 }
3372 self.adapter.nats.connection_timeout_ms = parse_env::<u64>(
3373 "NATS_CONNECTION_TIMEOUT_MS",
3374 self.adapter.nats.connection_timeout_ms,
3375 );
3376 self.adapter.nats.request_timeout_ms = parse_env::<u64>(
3377 "NATS_REQUEST_TIMEOUT_MS",
3378 self.adapter.nats.request_timeout_ms,
3379 );
3380 self.adapter.nats.discovery_max_wait_ms = parse_env::<u64>(
3381 "NATS_DISCOVERY_MAX_WAIT_MS",
3382 self.adapter.nats.discovery_max_wait_ms,
3383 );
3384 self.adapter.nats.discovery_idle_wait_ms = parse_env::<u64>(
3385 "NATS_DISCOVERY_IDLE_WAIT_MS",
3386 self.adapter.nats.discovery_idle_wait_ms,
3387 );
3388 if let Some(nodes) = parse_env_optional::<u32>("NATS_NODES_NUMBER") {
3389 self.adapter.nats.nodes_number = Some(nodes);
3390 }
3391 if let Some(v) = parse_env_optional::<usize>("NATS_SUBSCRIPTION_CAPACITY") {
3392 self.adapter.nats.subscription_capacity = Some(v);
3393 }
3394 if let Some(v) = parse_env_optional::<usize>("NATS_CLIENT_CAPACITY") {
3395 self.adapter.nats.client_capacity = Some(v);
3396 }
3397 if let Some(v) = parse_env_optional::<usize>("NATS_MAX_RECONNECTS") {
3398 self.adapter.nats.max_reconnects = Some(v);
3399 }
3400 if let Some(v) = parse_env_optional::<usize>("NATS_PRESENCE_SYNC_CHUNK_SIZE") {
3401 self.adapter.nats.presence_sync_chunk_size = Some(v);
3402 }
3403
3404 if let Ok(url) = std::env::var("PULSAR_URL") {
3406 self.adapter.pulsar.url = url;
3407 }
3408 if let Ok(prefix) = std::env::var("PULSAR_PREFIX") {
3409 self.adapter.pulsar.prefix = prefix;
3410 }
3411 if let Ok(token) = std::env::var("PULSAR_TOKEN") {
3412 self.adapter.pulsar.token = Some(token);
3413 }
3414 self.adapter.pulsar.request_timeout_ms = parse_env::<u64>(
3415 "PULSAR_REQUEST_TIMEOUT_MS",
3416 self.adapter.pulsar.request_timeout_ms,
3417 );
3418 if let Some(nodes) = parse_env_optional::<u32>("PULSAR_NODES_NUMBER") {
3419 self.adapter.pulsar.nodes_number = Some(nodes);
3420 }
3421
3422 if let Ok(url) = std::env::var("RABBITMQ_URL") {
3424 self.adapter.rabbitmq.url = url;
3425 }
3426 if let Ok(prefix) = std::env::var("RABBITMQ_PREFIX") {
3427 self.adapter.rabbitmq.prefix = prefix;
3428 }
3429 self.adapter.rabbitmq.connection_timeout_ms = parse_env::<u64>(
3430 "RABBITMQ_CONNECTION_TIMEOUT_MS",
3431 self.adapter.rabbitmq.connection_timeout_ms,
3432 );
3433 self.adapter.rabbitmq.request_timeout_ms = parse_env::<u64>(
3434 "RABBITMQ_REQUEST_TIMEOUT_MS",
3435 self.adapter.rabbitmq.request_timeout_ms,
3436 );
3437 if let Some(nodes) = parse_env_optional::<u32>("RABBITMQ_NODES_NUMBER") {
3438 self.adapter.rabbitmq.nodes_number = Some(nodes);
3439 }
3440
3441 if let Ok(project_id) = std::env::var("GOOGLE_PUBSUB_PROJECT_ID") {
3443 self.adapter.google_pubsub.project_id = project_id;
3444 }
3445 if let Ok(prefix) = std::env::var("GOOGLE_PUBSUB_PREFIX") {
3446 self.adapter.google_pubsub.prefix = prefix;
3447 }
3448 if let Ok(emulator_host) = std::env::var("PUBSUB_EMULATOR_HOST") {
3449 self.adapter.google_pubsub.emulator_host = Some(emulator_host);
3450 }
3451 self.adapter.google_pubsub.request_timeout_ms = parse_env::<u64>(
3452 "GOOGLE_PUBSUB_REQUEST_TIMEOUT_MS",
3453 self.adapter.google_pubsub.request_timeout_ms,
3454 );
3455 if let Some(nodes) = parse_env_optional::<u32>("GOOGLE_PUBSUB_NODES_NUMBER") {
3456 self.adapter.google_pubsub.nodes_number = Some(nodes);
3457 }
3458
3459 if let Ok(brokers) = std::env::var("KAFKA_BROKERS") {
3461 self.adapter.kafka.brokers = brokers.split(',').map(|s| s.trim().to_string()).collect();
3462 }
3463 if let Ok(prefix) = std::env::var("KAFKA_PREFIX") {
3464 self.adapter.kafka.prefix = prefix;
3465 }
3466 if let Ok(protocol) = std::env::var("KAFKA_SECURITY_PROTOCOL") {
3467 self.adapter.kafka.security_protocol = Some(protocol);
3468 }
3469 if let Ok(mechanism) = std::env::var("KAFKA_SASL_MECHANISM") {
3470 self.adapter.kafka.sasl_mechanism = Some(mechanism);
3471 }
3472 if let Ok(username) = std::env::var("KAFKA_SASL_USERNAME") {
3473 self.adapter.kafka.sasl_username = Some(username);
3474 }
3475 if let Ok(password) = std::env::var("KAFKA_SASL_PASSWORD") {
3476 self.adapter.kafka.sasl_password = Some(password);
3477 }
3478 self.adapter.kafka.request_timeout_ms = parse_env::<u64>(
3479 "KAFKA_REQUEST_TIMEOUT_MS",
3480 self.adapter.kafka.request_timeout_ms,
3481 );
3482 if let Some(nodes) = parse_env_optional::<u32>("KAFKA_NODES_NUMBER") {
3483 self.adapter.kafka.nodes_number = Some(nodes);
3484 }
3485
3486 if let Ok(connection_string) = std::env::var("IGGY_CONNECTION_STRING") {
3488 self.adapter.iggy.connection_string = connection_string.clone();
3489 self.queue.iggy.connection_string = connection_string;
3490 }
3491 if let Ok(username) = std::env::var("IGGY_USERNAME") {
3492 let username = (!username.is_empty()).then_some(username);
3493 self.adapter.iggy.username = username.clone();
3494 self.queue.iggy.username = username;
3495 }
3496 if let Ok(password) = std::env::var("IGGY_PASSWORD") {
3497 let password = (!password.is_empty()).then_some(password);
3498 self.adapter.iggy.password = password.clone();
3499 self.queue.iggy.password = password;
3500 }
3501 if let Ok(consumer_name) = std::env::var("IGGY_CONSUMER_NAME") {
3502 let consumer_name = (!consumer_name.is_empty()).then_some(consumer_name);
3503 self.adapter.iggy.consumer_name = consumer_name.clone();
3504 self.queue.iggy.consumer_name = consumer_name;
3505 } else if let Ok(process_id) = std::env::var("INSTANCE_PROCESS_ID") {
3506 let process_id = (!process_id.is_empty()).then_some(process_id);
3507 self.adapter.iggy.consumer_name = process_id.clone();
3508 self.queue.iggy.consumer_name = process_id;
3509 }
3510 if let Ok(stream) = std::env::var("IGGY_STREAM") {
3511 self.adapter.iggy.stream = stream.clone();
3512 self.queue.iggy.stream = stream;
3513 }
3514 if let Ok(prefix) = std::env::var("IGGY_TOPIC_PREFIX") {
3515 self.adapter.iggy.topic_prefix = prefix;
3516 }
3517 if let Ok(prefix) = std::env::var("IGGY_QUEUE_TOPIC_PREFIX") {
3518 self.queue.iggy.queue_topic_prefix = prefix;
3519 }
3520 if let Ok(prefix) = std::env::var("IGGY_CONSUMER_GROUP_PREFIX") {
3521 self.queue.iggy.consumer_group_prefix = prefix;
3522 }
3523 self.adapter.iggy.request_timeout_ms = parse_env::<u64>(
3524 "IGGY_REQUEST_TIMEOUT_MS",
3525 self.adapter.iggy.request_timeout_ms,
3526 );
3527 self.queue.iggy.request_timeout_ms = parse_env::<u64>(
3528 "IGGY_REQUEST_TIMEOUT_MS",
3529 self.queue.iggy.request_timeout_ms,
3530 );
3531 self.adapter.iggy.poll_interval_ms =
3532 parse_env::<u64>("IGGY_POLL_INTERVAL_MS", self.adapter.iggy.poll_interval_ms);
3533 self.queue.iggy.poll_interval_ms =
3534 parse_env::<u64>("IGGY_POLL_INTERVAL_MS", self.queue.iggy.poll_interval_ms);
3535 self.adapter.iggy.poll_batch_size =
3536 parse_env::<u32>("IGGY_POLL_BATCH_SIZE", self.adapter.iggy.poll_batch_size);
3537 self.queue.iggy.poll_batch_size =
3538 parse_env::<u32>("IGGY_POLL_BATCH_SIZE", self.queue.iggy.poll_batch_size);
3539 self.adapter.iggy.partitions_count = parse_env::<u32>(
3540 "ADAPTER_IGGY_PARTITIONS_COUNT",
3541 parse_env::<u32>("IGGY_PARTITIONS_COUNT", self.adapter.iggy.partitions_count),
3542 );
3543 self.queue.iggy.partitions_count = parse_env::<u32>(
3544 "QUEUE_IGGY_PARTITIONS_COUNT",
3545 parse_env::<u32>("IGGY_PARTITIONS_COUNT", self.queue.iggy.partitions_count),
3546 );
3547 self.adapter.iggy.partition_id = parse_env::<u32>(
3548 "ADAPTER_IGGY_PARTITION_ID",
3549 parse_env::<u32>("IGGY_PARTITION_ID", self.adapter.iggy.partition_id),
3550 );
3551 self.queue.iggy.partition_id = parse_env::<u32>(
3552 "QUEUE_IGGY_PARTITION_ID",
3553 parse_env::<u32>("IGGY_PARTITION_ID", self.queue.iggy.partition_id),
3554 );
3555 self.adapter.iggy.auto_create =
3556 parse_env::<bool>("IGGY_AUTO_CREATE", self.adapter.iggy.auto_create);
3557 self.queue.iggy.auto_create =
3558 parse_env::<bool>("IGGY_AUTO_CREATE", self.queue.iggy.auto_create);
3559 self.adapter.iggy.start_from_latest = parse_env::<bool>(
3560 "IGGY_START_FROM_LATEST",
3561 self.adapter.iggy.start_from_latest,
3562 );
3563 if let Some(nodes) = parse_env_optional::<u32>("IGGY_NODES_NUMBER") {
3564 self.adapter.iggy.nodes_number = Some(nodes);
3565 self.queue.iggy.nodes_number = Some(nodes);
3566 }
3567
3568 if let Ok(origins) = std::env::var("CORS_ORIGINS") {
3570 let parsed: Vec<String> = origins.split(',').map(|s| s.trim().to_string()).collect();
3571 if let Err(e) = crate::origin_validation::OriginValidator::validate_patterns(&parsed) {
3572 warn!(
3573 "CORS_ORIGINS contains invalid patterns: {}. Keeping previous CORS origins.",
3574 e
3575 );
3576 } else {
3577 self.cors.origin = parsed;
3578 }
3579 }
3580 if let Ok(methods) = std::env::var("CORS_METHODS") {
3581 self.cors.methods = methods.split(',').map(|s| s.trim().to_string()).collect();
3582 }
3583 if let Ok(headers) = std::env::var("CORS_HEADERS") {
3584 self.cors.allowed_headers = headers.split(',').map(|s| s.trim().to_string()).collect();
3585 }
3586 self.cors.credentials = parse_bool_env("CORS_CREDENTIALS", self.cors.credentials);
3587
3588 self.database_pooling.enabled =
3590 parse_bool_env("DATABASE_POOLING_ENABLED", self.database_pooling.enabled);
3591 if let Some(min) = parse_env_optional::<u32>("DATABASE_POOL_MIN") {
3592 self.database_pooling.min = min;
3593 }
3594 if let Some(max) = parse_env_optional::<u32>("DATABASE_POOL_MAX") {
3595 self.database_pooling.max = max;
3596 }
3597
3598 if let Some(pool_size) = parse_env_optional::<u32>("DATABASE_CONNECTION_POOL_SIZE") {
3599 self.database.mysql.connection_pool_size = pool_size;
3600 self.database.postgres.connection_pool_size = pool_size;
3601 }
3602 if let Some(cache_ttl) = parse_env_optional::<u64>("CACHE_TTL_SECONDS") {
3603 self.app_manager.cache.ttl = cache_ttl;
3604 self.channel_limits.cache_ttl = cache_ttl;
3605 self.database.mysql.cache_ttl = cache_ttl;
3606 self.database.postgres.cache_ttl = cache_ttl;
3607 self.database.surrealdb.cache_ttl = cache_ttl;
3608 self.cache.memory.ttl = cache_ttl;
3609 }
3610 if let Some(cleanup_interval) = parse_env_optional::<u64>("CACHE_CLEANUP_INTERVAL") {
3611 self.database.mysql.cache_cleanup_interval = cleanup_interval;
3612 self.database.postgres.cache_cleanup_interval = cleanup_interval;
3613 self.cache.memory.cleanup_interval = cleanup_interval;
3614 }
3615 if let Some(max_capacity) = parse_env_optional::<u64>("CACHE_MAX_CAPACITY") {
3616 self.database.mysql.cache_max_capacity = max_capacity;
3617 self.database.postgres.cache_max_capacity = max_capacity;
3618 self.database.surrealdb.cache_max_capacity = max_capacity;
3619 self.cache.memory.max_capacity = max_capacity;
3620 }
3621
3622 let skip_inline_apps = parse_bool_env("SOCKUDO_SKIP_INLINE_APPS", false)
3623 || !parse_bool_env("APP_MANAGER_REGISTER_INLINE_APPS", true);
3624 if skip_inline_apps {
3625 let app_count = self.app_manager.array.apps.len();
3626 self.app_manager.array.apps.clear();
3627 if app_count > 0 {
3628 info!(
3629 "Skipping {} inline app(s) from configuration due to environment override",
3630 app_count
3631 );
3632 }
3633 }
3634
3635 let default_app_id = std::env::var("SOCKUDO_DEFAULT_APP_ID").ok();
3636 let default_app_key = std::env::var("SOCKUDO_DEFAULT_APP_KEY").ok();
3637 let default_app_secret = std::env::var("SOCKUDO_DEFAULT_APP_SECRET").ok();
3638 let default_app_enabled_env = std::env::var("SOCKUDO_DEFAULT_APP_ENABLED").ok();
3639 let default_app_enabled = parse_bool_env("SOCKUDO_DEFAULT_APP_ENABLED", true);
3640 let default_app_credentials_configured =
3641 default_app_id.is_some() || default_app_key.is_some() || default_app_secret.is_some();
3642 let default_app_env_configured =
3643 default_app_credentials_configured || default_app_enabled_env.is_some();
3644 let default_app_should_override_inline = default_app_credentials_configured
3645 || default_app_enabled_env.is_some_and(|_| !default_app_enabled);
3646
3647 if default_app_should_override_inline {
3648 let app_count = self.app_manager.array.apps.len();
3649 self.app_manager.array.apps.clear();
3650 if app_count > 0 {
3651 info!(
3652 "Replacing {} inline app(s) from configuration with SOCKUDO_DEFAULT_APP_* settings",
3653 app_count
3654 );
3655 }
3656 }
3657
3658 if let (Some(app_id), Some(app_key), Some(app_secret)) =
3659 (default_app_id, default_app_key, default_app_secret)
3660 && default_app_enabled
3661 {
3662 let default_app = App::from_policy(
3663 app_id,
3664 app_key,
3665 app_secret,
3666 default_app_enabled,
3667 crate::app::AppPolicy {
3668 limits: crate::app::AppLimitsPolicy {
3669 max_connections: parse_env::<u32>(
3670 "SOCKUDO_DEFAULT_APP_MAX_CONNECTIONS",
3671 100,
3672 ),
3673 max_backend_events_per_second: Some(parse_env::<u32>(
3674 "SOCKUDO_DEFAULT_APP_MAX_BACKEND_EVENTS_PER_SECOND",
3675 100,
3676 )),
3677 max_client_events_per_second: parse_env::<u32>(
3678 "SOCKUDO_DEFAULT_APP_MAX_CLIENT_EVENTS_PER_SECOND",
3679 100,
3680 ),
3681 max_read_requests_per_second: Some(parse_env::<u32>(
3682 "SOCKUDO_DEFAULT_APP_MAX_READ_REQUESTS_PER_SECOND",
3683 100,
3684 )),
3685 max_presence_members_per_channel: Some(parse_env::<u32>(
3686 "SOCKUDO_DEFAULT_APP_MAX_PRESENCE_MEMBERS_PER_CHANNEL",
3687 100,
3688 )),
3689 max_presence_member_size_in_kb: Some(parse_env::<u32>(
3690 "SOCKUDO_DEFAULT_APP_MAX_PRESENCE_MEMBER_SIZE_IN_KB",
3691 100,
3692 )),
3693 max_channel_name_length: Some(parse_env::<u32>(
3694 "SOCKUDO_DEFAULT_APP_MAX_CHANNEL_NAME_LENGTH",
3695 100,
3696 )),
3697 max_event_channels_at_once: Some(parse_env::<u32>(
3698 "SOCKUDO_DEFAULT_APP_MAX_EVENT_CHANNELS_AT_ONCE",
3699 100,
3700 )),
3701 max_event_name_length: Some(parse_env::<u32>(
3702 "SOCKUDO_DEFAULT_APP_MAX_EVENT_NAME_LENGTH",
3703 100,
3704 )),
3705 max_event_payload_in_kb: Some(parse_env::<u32>(
3706 "SOCKUDO_DEFAULT_APP_MAX_EVENT_PAYLOAD_IN_KB",
3707 100,
3708 )),
3709 max_event_batch_size: Some(parse_env::<u32>(
3710 "SOCKUDO_DEFAULT_APP_MAX_EVENT_BATCH_SIZE",
3711 100,
3712 )),
3713 decay_seconds: None,
3714 terminate_on_limit: false,
3715 message_rate_limit: None,
3716 },
3717 features: crate::app::AppFeaturesPolicy {
3718 enable_client_messages: std::env::var(
3719 "SOCKUDO_DEFAULT_APP_ENABLE_CLIENT_MESSAGES",
3720 )
3721 .ok()
3722 .map(|value| {
3723 matches!(
3724 value.trim().to_ascii_lowercase().as_str(),
3725 "true" | "1" | "yes" | "on"
3726 )
3727 })
3728 .unwrap_or_else(|| parse_bool_env("SOCKUDO_ENABLE_CLIENT_MESSAGES", false)),
3729 enable_user_authentication: Some(parse_bool_env(
3730 "SOCKUDO_DEFAULT_APP_ENABLE_USER_AUTHENTICATION",
3731 false,
3732 )),
3733 enable_watchlist_events: Some(parse_bool_env(
3734 "SOCKUDO_DEFAULT_APP_ENABLE_WATCHLIST_EVENTS",
3735 false,
3736 )),
3737 },
3738 channels: crate::app::AppChannelsPolicy {
3739 allowed_origins: {
3740 if let Ok(origins_str) =
3741 std::env::var("SOCKUDO_DEFAULT_APP_ALLOWED_ORIGINS")
3742 {
3743 if !origins_str.is_empty() {
3744 Some(
3745 origins_str
3746 .split(',')
3747 .map(|s| s.trim().to_string())
3748 .collect(),
3749 )
3750 } else {
3751 None
3752 }
3753 } else {
3754 None
3755 }
3756 },
3757 annotations_enabled: Some(parse_bool_env(
3758 "SOCKUDO_DEFAULT_APP_ANNOTATIONS_ENABLED",
3759 false,
3760 )),
3761 channel_delta_compression: None,
3762 channel_namespaces: None,
3763 },
3764 webhooks: None,
3765 idempotency: None,
3766 connection_recovery: None,
3767 history: None,
3768 presence_history: None,
3769 },
3770 );
3771
3772 self.app_manager.array.apps.push(default_app);
3773 info!("Successfully registered default app from env");
3774 } else if default_app_env_configured && !default_app_enabled {
3775 info!("Default app registration disabled by SOCKUDO_DEFAULT_APP_ENABLED=false");
3776 } else if default_app_credentials_configured {
3777 warn!(
3778 "SOCKUDO_DEFAULT_APP_* environment was configured but id, key, and secret were not all provided; no default app registered"
3779 );
3780 }
3781
3782 if let Ok(redis_url_env) = std::env::var("REDIS_URL") {
3784 info!("Applying REDIS_URL environment variable override");
3785
3786 let redis_url_json = sonic_rs::json!(redis_url_env);
3787
3788 self.adapter
3789 .redis
3790 .redis_pub_options
3791 .insert("url".to_string(), redis_url_json.clone());
3792 self.adapter
3793 .redis
3794 .redis_sub_options
3795 .insert("url".to_string(), redis_url_json);
3796
3797 self.cache.redis.url_override = Some(redis_url_env.clone());
3798 self.queue.redis.url_override = Some(redis_url_env.clone());
3799 self.rate_limiter.redis.url_override = Some(redis_url_env);
3800 }
3801
3802 let has_colors_env = std::env::var("LOG_COLORS_ENABLED").is_ok();
3804 let has_target_env = std::env::var("LOG_INCLUDE_TARGET").is_ok();
3805 if has_colors_env || has_target_env {
3806 let logging_config = self.logging.get_or_insert_with(Default::default);
3807 if has_colors_env {
3808 logging_config.colors_enabled =
3809 parse_bool_env("LOG_COLORS_ENABLED", logging_config.colors_enabled);
3810 }
3811 if has_target_env {
3812 logging_config.include_target =
3813 parse_bool_env("LOG_INCLUDE_TARGET", logging_config.include_target);
3814 }
3815 }
3816
3817 self.cleanup.async_enabled =
3819 parse_bool_env("CLEANUP_ASYNC_ENABLED", self.cleanup.async_enabled);
3820 self.cleanup.fallback_to_sync =
3821 parse_bool_env("CLEANUP_FALLBACK_TO_SYNC", self.cleanup.fallback_to_sync);
3822 self.cleanup.queue_buffer_size =
3823 parse_env::<usize>("CLEANUP_QUEUE_BUFFER_SIZE", self.cleanup.queue_buffer_size);
3824 self.cleanup.batch_size = parse_env::<usize>("CLEANUP_BATCH_SIZE", self.cleanup.batch_size);
3825 self.cleanup.batch_timeout_ms =
3826 parse_env::<u64>("CLEANUP_BATCH_TIMEOUT_MS", self.cleanup.batch_timeout_ms);
3827 if let Ok(worker_threads_str) = std::env::var("CLEANUP_WORKER_THREADS") {
3828 self.cleanup.worker_threads = if worker_threads_str.to_lowercase() == "auto" {
3829 WorkerThreadsConfig::Auto
3830 } else if let Ok(n) = worker_threads_str.parse::<usize>() {
3831 WorkerThreadsConfig::Fixed(n)
3832 } else {
3833 warn!(
3834 "Invalid CLEANUP_WORKER_THREADS value '{}', keeping current setting",
3835 worker_threads_str
3836 );
3837 self.cleanup.worker_threads.clone()
3838 };
3839 }
3840 self.cleanup.max_retry_attempts = parse_env::<u32>(
3841 "CLEANUP_MAX_RETRY_ATTEMPTS",
3842 self.cleanup.max_retry_attempts,
3843 );
3844
3845 self.cluster_health.enabled =
3847 parse_bool_env("CLUSTER_HEALTH_ENABLED", self.cluster_health.enabled);
3848 self.cluster_health.heartbeat_interval_ms = parse_env::<u64>(
3849 "CLUSTER_HEALTH_HEARTBEAT_INTERVAL",
3850 self.cluster_health.heartbeat_interval_ms,
3851 );
3852 self.cluster_health.node_timeout_ms = parse_env::<u64>(
3853 "CLUSTER_HEALTH_NODE_TIMEOUT",
3854 self.cluster_health.node_timeout_ms,
3855 );
3856 self.cluster_health.cleanup_interval_ms = parse_env::<u64>(
3857 "CLUSTER_HEALTH_CLEANUP_INTERVAL",
3858 self.cluster_health.cleanup_interval_ms,
3859 );
3860
3861 self.health_check_timeout_ms =
3863 parse_env::<u64>("HEALTH_CHECK_TIMEOUT_MS", self.health_check_timeout_ms);
3864
3865 self.tag_filtering.enabled =
3867 parse_bool_env("TAG_FILTERING_ENABLED", self.tag_filtering.enabled);
3868
3869 if let Ok(val) = std::env::var("WEBSOCKET_MAX_MESSAGES") {
3871 if val.to_lowercase() == "none" || val == "0" {
3872 self.websocket.max_messages = None;
3873 } else if let Ok(n) = val.parse::<usize>() {
3874 self.websocket.max_messages = Some(n);
3875 }
3876 }
3877 if let Ok(val) = std::env::var("WEBSOCKET_MAX_BYTES") {
3878 if val.to_lowercase() == "none" || val == "0" {
3879 self.websocket.max_bytes = None;
3880 } else if let Ok(n) = val.parse::<usize>() {
3881 self.websocket.max_bytes = Some(n);
3882 }
3883 }
3884 self.websocket.disconnect_on_buffer_full = parse_bool_env(
3885 "WEBSOCKET_DISCONNECT_ON_BUFFER_FULL",
3886 self.websocket.disconnect_on_buffer_full,
3887 );
3888 self.websocket.max_message_size = parse_env::<usize>(
3889 "WEBSOCKET_MAX_MESSAGE_SIZE",
3890 self.websocket.max_message_size,
3891 );
3892 self.websocket.max_frame_size =
3893 parse_env::<usize>("WEBSOCKET_MAX_FRAME_SIZE", self.websocket.max_frame_size);
3894 self.websocket.write_buffer_size = parse_env::<usize>(
3895 "WEBSOCKET_WRITE_BUFFER_SIZE",
3896 self.websocket.write_buffer_size,
3897 );
3898 self.websocket.max_backpressure = parse_env::<usize>(
3899 "WEBSOCKET_MAX_BACKPRESSURE",
3900 self.websocket.max_backpressure,
3901 );
3902 self.websocket.auto_ping = parse_bool_env("WEBSOCKET_AUTO_PING", self.websocket.auto_ping);
3903 self.websocket.ping_interval =
3904 parse_env::<u32>("WEBSOCKET_PING_INTERVAL", self.websocket.ping_interval);
3905 self.websocket.idle_timeout =
3906 parse_env::<u32>("WEBSOCKET_IDLE_TIMEOUT", self.websocket.idle_timeout);
3907 if let Ok(mode) = std::env::var("WEBSOCKET_COMPRESSION") {
3908 self.websocket.compression = mode;
3909 }
3910
3911 self.connection_recovery.enabled = parse_bool_env(
3913 "CONNECTION_RECOVERY_ENABLED",
3914 self.connection_recovery.enabled,
3915 );
3916 self.connection_recovery.buffer_ttl_seconds = parse_env::<u64>(
3917 "CONNECTION_RECOVERY_BUFFER_TTL",
3918 self.connection_recovery.buffer_ttl_seconds,
3919 );
3920 self.connection_recovery.max_buffer_size = parse_env::<usize>(
3921 "CONNECTION_RECOVERY_MAX_BUFFER_SIZE",
3922 self.connection_recovery.max_buffer_size,
3923 );
3924
3925 self.history.enabled = parse_bool_env("HISTORY_ENABLED", self.history.enabled);
3926 self.history.rewind_enabled =
3927 parse_bool_env("HISTORY_REWIND_ENABLED", self.history.rewind_enabled);
3928 self.history.retention_window_seconds = parse_env::<u64>(
3929 "HISTORY_RETENTION_WINDOW_SECONDS",
3930 self.history.retention_window_seconds,
3931 );
3932 self.history.max_page_size =
3933 parse_env::<usize>("HISTORY_MAX_PAGE_SIZE", self.history.max_page_size);
3934 self.history.writer_shards =
3935 parse_env::<usize>("HISTORY_WRITER_SHARDS", self.history.writer_shards);
3936 self.history.writer_queue_capacity = parse_env::<usize>(
3937 "HISTORY_WRITER_QUEUE_CAPACITY",
3938 self.history.writer_queue_capacity,
3939 );
3940 if let Ok(backend) = std::env::var("HISTORY_BACKEND") {
3941 self.history.backend = HistoryBackend::from_str(&backend)?;
3942 }
3943 if let Ok(max_messages) = std::env::var("HISTORY_MAX_MESSAGES_PER_CHANNEL") {
3944 self.history.max_messages_per_channel = Some(
3945 max_messages
3946 .parse::<usize>()
3947 .map_err(|e| format!("Invalid HISTORY_MAX_MESSAGES_PER_CHANNEL: {e}"))?,
3948 );
3949 }
3950 if let Ok(max_bytes) = std::env::var("HISTORY_MAX_BYTES_PER_CHANNEL") {
3951 self.history.max_bytes_per_channel = Some(
3952 max_bytes
3953 .parse::<u64>()
3954 .map_err(|e| format!("Invalid HISTORY_MAX_BYTES_PER_CHANNEL: {e}"))?,
3955 );
3956 }
3957 if let Ok(table_prefix) = std::env::var("HISTORY_POSTGRES_TABLE_PREFIX") {
3958 self.history.postgres.table_prefix = table_prefix;
3959 }
3960 self.history.postgres.write_timeout_ms = parse_env::<u64>(
3961 "HISTORY_POSTGRES_WRITE_TIMEOUT_MS",
3962 self.history.postgres.write_timeout_ms,
3963 );
3964 self.history.purge_interval_seconds = parse_env::<u64>(
3965 "HISTORY_PURGE_INTERVAL_SECONDS",
3966 self.history.purge_interval_seconds,
3967 );
3968 self.history.purge_batch_size =
3969 parse_env::<usize>("HISTORY_PURGE_BATCH_SIZE", self.history.purge_batch_size);
3970 self.history.max_purge_per_tick = parse_env::<usize>(
3971 "HISTORY_MAX_PURGE_PER_TICK",
3972 self.history.max_purge_per_tick,
3973 );
3974
3975 self.presence_history.enabled =
3976 parse_bool_env("PRESENCE_HISTORY_ENABLED", self.presence_history.enabled);
3977 self.presence_history.retention_window_seconds = parse_env::<u64>(
3978 "PRESENCE_HISTORY_RETENTION_WINDOW_SECONDS",
3979 self.presence_history.retention_window_seconds,
3980 );
3981 self.presence_history.max_page_size = parse_env::<usize>(
3982 "PRESENCE_HISTORY_MAX_PAGE_SIZE",
3983 self.presence_history.max_page_size,
3984 );
3985 if let Ok(max_events) = std::env::var("PRESENCE_HISTORY_MAX_EVENTS_PER_CHANNEL") {
3986 self.presence_history.max_events_per_channel =
3987 Some(max_events.parse::<usize>().map_err(|e| {
3988 format!("Invalid PRESENCE_HISTORY_MAX_EVENTS_PER_CHANNEL: {e}")
3989 })?);
3990 }
3991 if let Ok(max_bytes) = std::env::var("PRESENCE_HISTORY_MAX_BYTES_PER_CHANNEL") {
3992 self.presence_history.max_bytes_per_channel = Some(
3993 max_bytes
3994 .parse::<u64>()
3995 .map_err(|e| format!("Invalid PRESENCE_HISTORY_MAX_BYTES_PER_CHANNEL: {e}"))?,
3996 );
3997 }
3998 self.idempotency.enabled = parse_bool_env("IDEMPOTENCY_ENABLED", self.idempotency.enabled);
3999 self.idempotency.ttl_seconds =
4000 parse_env::<u64>("IDEMPOTENCY_TTL_SECONDS", self.idempotency.ttl_seconds);
4001 self.idempotency.max_key_length = parse_env::<usize>(
4002 "IDEMPOTENCY_MAX_KEY_LENGTH",
4003 self.idempotency.max_key_length,
4004 );
4005
4006 self.ephemeral.enabled = parse_bool_env("EPHEMERAL_ENABLED", self.ephemeral.enabled);
4007
4008 self.echo_control.enabled =
4009 parse_bool_env("ECHO_CONTROL_ENABLED", self.echo_control.enabled);
4010 self.echo_control.default_echo_messages = parse_bool_env(
4011 "ECHO_CONTROL_DEFAULT_ECHO_MESSAGES",
4012 self.echo_control.default_echo_messages,
4013 );
4014
4015 self.event_name_filtering.enabled = parse_bool_env(
4016 "EVENT_NAME_FILTERING_ENABLED",
4017 self.event_name_filtering.enabled,
4018 );
4019 self.event_name_filtering.max_events_per_filter = parse_env::<usize>(
4020 "EVENT_NAME_FILTERING_MAX_EVENTS_PER_FILTER",
4021 self.event_name_filtering.max_events_per_filter,
4022 );
4023 self.event_name_filtering.max_event_name_length = parse_env::<usize>(
4024 "EVENT_NAME_FILTERING_MAX_EVENT_NAME_LENGTH",
4025 self.event_name_filtering.max_event_name_length,
4026 );
4027 self.versioned_messages.enabled = parse_bool_env(
4028 "VERSIONED_MESSAGES_ENABLED",
4029 self.versioned_messages.enabled,
4030 );
4031 if let Ok(driver_str) = std::env::var("VERSIONED_MESSAGES_DRIVER") {
4032 self.versioned_messages.driver = parse_driver_enum(
4033 driver_str,
4034 self.versioned_messages.driver.clone(),
4035 "VersionedMessages Backend",
4036 );
4037 }
4038 if let Ok(driver_str) = std::env::var("PUSH_STORAGE_DRIVER") {
4039 self.push.storage_driver = parse_driver_enum(
4040 driver_str,
4041 self.push.storage_driver.clone(),
4042 "Push Storage Backend",
4043 );
4044 }
4045 if let Ok(driver_str) = std::env::var("PUSH_QUEUE_DRIVER") {
4046 self.push.queue_driver = parse_driver_enum(
4047 driver_str,
4048 self.push.queue_driver.clone(),
4049 "Push Queue Backend",
4050 );
4051 }
4052 self.push.fcm_enabled = parse_bool_env("PUSH_FCM_ENABLED", self.push.fcm_enabled);
4053 self.push.apns_enabled = parse_bool_env("PUSH_APNS_ENABLED", self.push.apns_enabled);
4054 self.push.webpush_enabled =
4055 parse_bool_env("PUSH_WEBPUSH_ENABLED", self.push.webpush_enabled);
4056 self.push.hms_enabled = parse_bool_env("PUSH_HMS_ENABLED", self.push.hms_enabled);
4057 self.push.wns_enabled = parse_bool_env("PUSH_WNS_ENABLED", self.push.wns_enabled);
4058 if let Some(key) = parse_env_optional::<String>("PUSH_CREDENTIAL_ENCRYPTION_KEY") {
4059 self.push.credential_encryption_key = Some(key);
4060 }
4061 self.push.fanout_fast_threshold = parse_env::<u64>(
4062 "PUSH_FANOUT_FAST_THRESHOLD",
4063 self.push.fanout_fast_threshold,
4064 );
4065 self.push.fanout_shard_size =
4066 parse_env::<u64>("PUSH_FANOUT_SHARD_SIZE", self.push.fanout_shard_size);
4067 self.push.fanout_sync_threshold = parse_env::<u64>(
4068 "PUSH_FANOUT_SYNC_THRESHOLD",
4069 self.push.fanout_sync_threshold,
4070 );
4071 self.push.backpressure_lag_threshold_secs = parse_env::<u64>(
4072 "PUSH_BACKPRESSURE_LAG_THRESHOLD_SECS",
4073 self.push.backpressure_lag_threshold_secs,
4074 );
4075 self.push.publish_status_ttl_days = parse_env::<u64>(
4076 "PUSH_PUBLISH_STATUS_TTL_DAYS",
4077 self.push.publish_status_ttl_days,
4078 );
4079 self.push.circuit_breaker.failure_threshold = parse_env::<u32>(
4080 "PUSH_FAILURE_THRESHOLD",
4081 self.push.circuit_breaker.failure_threshold,
4082 );
4083 self.push.scheduler_interval_secs = parse_env::<u64>(
4084 "PUSH_SCHEDULER_INTERVAL_SECS",
4085 self.push.scheduler_interval_secs,
4086 );
4087 self.push.stale_device_max_age_days = parse_env::<u64>(
4088 "PUSH_STALE_DEVICE_MAX_AGE_DAYS",
4089 self.push.stale_device_max_age_days,
4090 );
4091 self.push.analytics_enabled =
4092 parse_bool_env("PUSH_ANALYTICS_ENABLED", self.push.analytics_enabled);
4093 self.push.default_quotas.acceptance_rps = parse_env::<u64>(
4094 "PUSH_DEFAULT_ACCEPTANCE_RPS",
4095 self.push.default_quotas.acceptance_rps,
4096 );
4097 self.push.default_quotas.delivery_quota_daily = parse_env::<u64>(
4098 "PUSH_DEFAULT_DELIVERY_QUOTA_DAILY",
4099 self.push.default_quotas.delivery_quota_daily,
4100 );
4101 self.push.default_quotas.fanout_max = parse_env::<u64>(
4102 "PUSH_DEFAULT_FANOUT_MAX",
4103 self.push.default_quotas.fanout_max,
4104 );
4105 self.push.default_quotas.inflight_max = parse_env::<u64>(
4106 "PUSH_DEFAULT_INFLIGHT_MAX",
4107 self.push.default_quotas.inflight_max,
4108 );
4109 self.versioned_messages.max_page_size = parse_env::<usize>(
4110 "VERSIONED_MESSAGES_MAX_PAGE_SIZE",
4111 self.versioned_messages.max_page_size,
4112 );
4113 self.versioned_messages.retention_window_seconds = parse_env::<u64>(
4114 "VERSIONED_MESSAGES_RETENTION_WINDOW_SECONDS",
4115 self.versioned_messages.retention_window_seconds,
4116 );
4117 self.versioned_messages.purge_interval_seconds = parse_env::<u64>(
4118 "VERSIONED_MESSAGES_PURGE_INTERVAL_SECONDS",
4119 self.versioned_messages.purge_interval_seconds,
4120 );
4121 self.versioned_messages.purge_batch_size = parse_env::<usize>(
4122 "VERSIONED_MESSAGES_PURGE_BATCH_SIZE",
4123 self.versioned_messages.purge_batch_size,
4124 );
4125 self.versioned_messages.max_purge_per_tick = parse_env::<usize>(
4126 "VERSIONED_MESSAGES_MAX_PURGE_PER_TICK",
4127 self.versioned_messages.max_purge_per_tick,
4128 );
4129 self.annotations.enabled = parse_bool_env("ANNOTATIONS_ENABLED", self.annotations.enabled);
4130
4131 Ok(())
4132 }
4133
4134 pub fn validate(&self) -> Result<(), String> {
4135 if self.unix_socket.enabled {
4136 if self.unix_socket.path.is_empty() {
4137 return Err(
4138 "Unix socket path cannot be empty when Unix socket is enabled".to_string(),
4139 );
4140 }
4141
4142 self.validate_unix_socket_security()?;
4143
4144 if self.ssl.enabled {
4145 tracing::warn!(
4146 "Both Unix socket and SSL are enabled. This is unusual as Unix sockets are typically used behind reverse proxies that handle SSL termination."
4147 );
4148 }
4149
4150 if self.unix_socket.permission_mode > 0o777 {
4151 return Err(format!(
4152 "Unix socket permission_mode ({:o}) is invalid. Must be a valid octal mode (0o000 to 0o777)",
4153 self.unix_socket.permission_mode
4154 ));
4155 }
4156 }
4157
4158 if let Err(e) = self.cleanup.validate() {
4159 return Err(format!("Invalid cleanup configuration: {}", e));
4160 }
4161
4162 if self.history.enabled {
4163 if self.history.max_page_size == 0 {
4164 return Err("history.max_page_size must be greater than 0".to_string());
4165 }
4166 if self.history.writer_shards == 0 {
4167 return Err("history.writer_shards must be greater than 0".to_string());
4168 }
4169 if self.history.writer_queue_capacity == 0 {
4170 return Err("history.writer_queue_capacity must be greater than 0".to_string());
4171 }
4172 if self.history.retention_window_seconds == 0 {
4173 return Err("history.retention_window_seconds must be greater than 0".to_string());
4174 }
4175 if self.history.postgres.table_prefix.trim().is_empty() {
4176 return Err("history.postgres.table_prefix must not be empty".to_string());
4177 }
4178 }
4179
4180 if self.presence_history.enabled {
4181 if self.presence_history.max_page_size == 0 {
4182 return Err("presence_history.max_page_size must be greater than 0".to_string());
4183 }
4184 if self.presence_history.retention_window_seconds == 0 {
4185 return Err(
4186 "presence_history.retention_window_seconds must be greater than 0".to_string(),
4187 );
4188 }
4189 }
4190
4191 if self.versioned_messages.enabled && self.versioned_messages.max_page_size == 0 {
4192 return Err("versioned_messages.max_page_size must be greater than 0".to_string());
4193 }
4194 if self.annotations.enabled && !self.versioned_messages.enabled {
4195 return Err("annotations require versioned_messages.enabled".to_string());
4196 }
4197
4198 if self.adapter.nats.presence_sync_chunk_size == Some(0) {
4199 return Err("nats.presence_sync_chunk_size must be > 0 when set".to_string());
4200 }
4201
4202 Ok(())
4203 }
4204
4205 fn validate_unix_socket_security(&self) -> Result<(), String> {
4206 let path = &self.unix_socket.path;
4207
4208 if path.contains("../") || path.contains("..\\") {
4209 return Err(
4210 "Unix socket path contains directory traversal sequences (../). This is not allowed for security reasons.".to_string()
4211 );
4212 }
4213
4214 if self.unix_socket.permission_mode & 0o002 != 0 {
4215 warn!(
4216 "Unix socket permission mode ({:o}) allows world write access. This may be a security risk. Consider using more restrictive permissions like 0o660 or 0o750.",
4217 self.unix_socket.permission_mode
4218 );
4219 }
4220
4221 if self.unix_socket.permission_mode & 0o007 > 0o005 {
4222 warn!(
4223 "Unix socket permission mode ({:o}) grants write permissions to others. Consider using more restrictive permissions.",
4224 self.unix_socket.permission_mode
4225 );
4226 }
4227
4228 if self.mode == "production" && path.starts_with("/tmp/") {
4229 warn!(
4230 "Unix socket path '{}' is in /tmp directory. In production, consider using a more permanent location like /var/run/sockudo/ for better security and persistence.",
4231 path
4232 );
4233 }
4234
4235 if !path.starts_with('/') {
4236 return Err(
4237 "Unix socket path must be absolute (start with /) for security and reliability."
4238 .to_string(),
4239 );
4240 }
4241
4242 Ok(())
4243 }
4244}
4245
4246#[cfg(test)]
4247mod redis_connection_tests {
4248 use super::{ClusterNode, RedisClusterConnection, RedisConnection, RedisSentinel};
4249
4250 #[test]
4251 fn test_standard_url_basic() {
4252 let conn = RedisConnection {
4253 host: "127.0.0.1".to_string(),
4254 port: 6379,
4255 db: 0,
4256 username: None,
4257 password: None,
4258 key_prefix: "sockudo:".to_string(),
4259 sentinels: Vec::new(),
4260 sentinel_password: None,
4261 name: "mymaster".to_string(),
4262 cluster: RedisClusterConnection::default(),
4263 cluster_nodes: Vec::new(),
4264 };
4265 assert_eq!(conn.to_url(), "redis://127.0.0.1:6379/0");
4266 }
4267
4268 #[test]
4269 fn test_standard_url_with_password() {
4270 let conn = RedisConnection {
4271 host: "127.0.0.1".to_string(),
4272 port: 6379,
4273 db: 2,
4274 username: None,
4275 password: Some("secret".to_string()),
4276 key_prefix: "sockudo:".to_string(),
4277 sentinels: Vec::new(),
4278 sentinel_password: None,
4279 name: "mymaster".to_string(),
4280 cluster: RedisClusterConnection::default(),
4281 cluster_nodes: Vec::new(),
4282 };
4283 assert_eq!(conn.to_url(), "redis://:secret@127.0.0.1:6379/2");
4284 }
4285
4286 #[test]
4287 fn test_standard_url_with_username_and_password() {
4288 let conn = RedisConnection {
4289 host: "redis.example.com".to_string(),
4290 port: 6380,
4291 db: 1,
4292 username: Some("admin".to_string()),
4293 password: Some("pass123".to_string()),
4294 key_prefix: "sockudo:".to_string(),
4295 sentinels: Vec::new(),
4296 sentinel_password: None,
4297 name: "mymaster".to_string(),
4298 cluster: RedisClusterConnection::default(),
4299 cluster_nodes: Vec::new(),
4300 };
4301 assert_eq!(
4302 conn.to_url(),
4303 "redis://admin:pass123@redis.example.com:6380/1"
4304 );
4305 }
4306
4307 #[test]
4308 fn test_standard_url_with_special_chars_in_password() {
4309 let conn = RedisConnection {
4310 host: "127.0.0.1".to_string(),
4311 port: 6379,
4312 db: 0,
4313 username: None,
4314 password: Some("pass@word#123".to_string()),
4315 key_prefix: "sockudo:".to_string(),
4316 sentinels: Vec::new(),
4317 sentinel_password: None,
4318 name: "mymaster".to_string(),
4319 cluster: RedisClusterConnection::default(),
4320 cluster_nodes: Vec::new(),
4321 };
4322 assert_eq!(conn.to_url(), "redis://:pass%40word%23123@127.0.0.1:6379/0");
4323 }
4324
4325 #[test]
4326 fn test_is_sentinel_configured_false() {
4327 let conn = RedisConnection::default();
4328 assert!(!conn.is_sentinel_configured());
4329 }
4330
4331 #[test]
4332 fn test_is_sentinel_configured_true() {
4333 let conn = RedisConnection {
4334 sentinels: vec![RedisSentinel {
4335 host: "sentinel1".to_string(),
4336 port: 26379,
4337 }],
4338 ..Default::default()
4339 };
4340 assert!(conn.is_sentinel_configured());
4341 }
4342
4343 #[test]
4344 fn test_sentinel_url_basic() {
4345 let conn = RedisConnection {
4346 host: "127.0.0.1".to_string(),
4347 port: 6379,
4348 db: 0,
4349 username: None,
4350 password: None,
4351 key_prefix: "sockudo:".to_string(),
4352 sentinels: vec![
4353 RedisSentinel {
4354 host: "sentinel1".to_string(),
4355 port: 26379,
4356 },
4357 RedisSentinel {
4358 host: "sentinel2".to_string(),
4359 port: 26379,
4360 },
4361 ],
4362 sentinel_password: None,
4363 name: "mymaster".to_string(),
4364 cluster: RedisClusterConnection::default(),
4365 cluster_nodes: Vec::new(),
4366 };
4367 assert_eq!(
4368 conn.to_url(),
4369 "redis+sentinel://sentinel1:26379,sentinel2:26379/mymaster/0"
4370 );
4371 }
4372
4373 #[test]
4374 fn test_sentinel_url_with_sentinel_password() {
4375 let conn = RedisConnection {
4376 host: "127.0.0.1".to_string(),
4377 port: 6379,
4378 db: 0,
4379 username: None,
4380 password: None,
4381 key_prefix: "sockudo:".to_string(),
4382 sentinels: vec![RedisSentinel {
4383 host: "sentinel1".to_string(),
4384 port: 26379,
4385 }],
4386 sentinel_password: Some("sentinelpass".to_string()),
4387 name: "mymaster".to_string(),
4388 cluster: RedisClusterConnection::default(),
4389 cluster_nodes: Vec::new(),
4390 };
4391 assert_eq!(
4392 conn.to_url(),
4393 "redis+sentinel://:sentinelpass@sentinel1:26379/mymaster/0"
4394 );
4395 }
4396
4397 #[test]
4398 fn test_sentinel_url_with_master_password() {
4399 let conn = RedisConnection {
4400 host: "127.0.0.1".to_string(),
4401 port: 6379,
4402 db: 1,
4403 username: None,
4404 password: Some("masterpass".to_string()),
4405 key_prefix: "sockudo:".to_string(),
4406 sentinels: vec![RedisSentinel {
4407 host: "sentinel1".to_string(),
4408 port: 26379,
4409 }],
4410 sentinel_password: None,
4411 name: "mymaster".to_string(),
4412 cluster: RedisClusterConnection::default(),
4413 cluster_nodes: Vec::new(),
4414 };
4415 assert_eq!(
4416 conn.to_url(),
4417 "redis+sentinel://sentinel1:26379/mymaster/1?password=masterpass"
4418 );
4419 }
4420
4421 #[test]
4422 fn test_sentinel_url_with_all_auth() {
4423 let conn = RedisConnection {
4424 host: "127.0.0.1".to_string(),
4425 port: 6379,
4426 db: 2,
4427 username: Some("redisuser".to_string()),
4428 password: Some("redispass".to_string()),
4429 key_prefix: "sockudo:".to_string(),
4430 sentinels: vec![
4431 RedisSentinel {
4432 host: "sentinel1".to_string(),
4433 port: 26379,
4434 },
4435 RedisSentinel {
4436 host: "sentinel2".to_string(),
4437 port: 26380,
4438 },
4439 ],
4440 sentinel_password: Some("sentinelauth".to_string()),
4441 name: "production-master".to_string(),
4442 cluster: RedisClusterConnection::default(),
4443 cluster_nodes: Vec::new(),
4444 };
4445 assert_eq!(
4446 conn.to_url(),
4447 "redis+sentinel://:sentinelauth@sentinel1:26379,sentinel2:26380/production-master/2?password=redispass&username=redisuser"
4448 );
4449 }
4450
4451 #[test]
4452 fn test_sentinel_to_host_port() {
4453 let sentinel = RedisSentinel {
4454 host: "sentinel.example.com".to_string(),
4455 port: 26379,
4456 };
4457 assert_eq!(sentinel.to_host_port(), "sentinel.example.com:26379");
4458 }
4459
4460 #[test]
4461 fn test_cluster_node_urls_with_shared_cluster_auth_and_tls() {
4462 let conn = RedisConnection {
4463 cluster: RedisClusterConnection {
4464 nodes: vec![
4465 ClusterNode {
4466 host: "node1.secure-cluster.com".to_string(),
4467 port: 7000,
4468 },
4469 ClusterNode {
4470 host: "redis://node2.secure-cluster.com:7001".to_string(),
4471 port: 7001,
4472 },
4473 ClusterNode {
4474 host: "rediss://node3.secure-cluster.com".to_string(),
4475 port: 7002,
4476 },
4477 ],
4478 username: None,
4479 password: Some("cluster-secret".to_string()),
4480 use_tls: true,
4481 },
4482 ..Default::default()
4483 };
4484
4485 assert_eq!(
4486 conn.cluster_node_urls(),
4487 vec![
4488 "rediss://:cluster-secret@node1.secure-cluster.com:7000",
4489 "redis://:cluster-secret@node2.secure-cluster.com:7001",
4490 "rediss://:cluster-secret@node3.secure-cluster.com:7002",
4491 ]
4492 );
4493 }
4494
4495 #[test]
4496 fn test_cluster_node_urls_fallback_to_legacy_nodes() {
4497 let conn = RedisConnection {
4498 password: Some("fallback-secret".to_string()),
4499 cluster_nodes: vec![ClusterNode {
4500 host: "legacy-node.example.com".to_string(),
4501 port: 7000,
4502 }],
4503 ..Default::default()
4504 };
4505
4506 assert_eq!(
4507 conn.cluster_node_urls(),
4508 vec!["redis://:fallback-secret@legacy-node.example.com:7000"]
4509 );
4510 }
4511
4512 #[test]
4513 fn test_normalize_cluster_seed_urls() {
4514 let conn = RedisConnection {
4515 cluster: RedisClusterConnection {
4516 nodes: Vec::new(),
4517 username: Some("svc-user".to_string()),
4518 password: Some("svc-pass".to_string()),
4519 use_tls: true,
4520 },
4521 ..Default::default()
4522 };
4523
4524 let seeds = vec![
4525 "node1.example.com:7000".to_string(),
4526 "redis://node2.example.com:7001".to_string(),
4527 "rediss://node3.example.com".to_string(),
4528 ];
4529
4530 assert_eq!(
4531 conn.normalize_cluster_seed_urls(&seeds),
4532 vec![
4533 "rediss://svc-user:svc-pass@node1.example.com:7000",
4534 "redis://svc-user:svc-pass@node2.example.com:7001",
4535 "rediss://svc-user:svc-pass@node3.example.com:6379",
4536 ]
4537 );
4538 }
4539}
4540
4541#[cfg(test)]
4542mod cluster_node_tests {
4543 use super::ClusterNode;
4544
4545 #[test]
4546 fn test_to_url_basic_host() {
4547 let node = ClusterNode {
4548 host: "localhost".to_string(),
4549 port: 6379,
4550 };
4551 assert_eq!(node.to_url(), "redis://localhost:6379");
4552 }
4553
4554 #[test]
4555 fn test_to_url_ip_address() {
4556 let node = ClusterNode {
4557 host: "127.0.0.1".to_string(),
4558 port: 6379,
4559 };
4560 assert_eq!(node.to_url(), "redis://127.0.0.1:6379");
4561 }
4562
4563 #[test]
4564 fn test_to_url_with_redis_protocol() {
4565 let node = ClusterNode {
4566 host: "redis://example.com".to_string(),
4567 port: 6379,
4568 };
4569 assert_eq!(node.to_url(), "redis://example.com:6379");
4570 }
4571
4572 #[test]
4573 fn test_to_url_with_rediss_protocol() {
4574 let node = ClusterNode {
4575 host: "rediss://secure.example.com".to_string(),
4576 port: 6379,
4577 };
4578 assert_eq!(node.to_url(), "rediss://secure.example.com:6379");
4579 }
4580
4581 #[test]
4582 fn test_to_url_with_rediss_protocol_and_port_in_url() {
4583 let node = ClusterNode {
4584 host: "rediss://secure.example.com:7000".to_string(),
4585 port: 6379,
4586 };
4587 assert_eq!(node.to_url(), "rediss://secure.example.com:7000");
4588 }
4589
4590 #[test]
4591 fn test_to_url_with_redis_protocol_and_port_in_url() {
4592 let node = ClusterNode {
4593 host: "redis://example.com:7001".to_string(),
4594 port: 6379,
4595 };
4596 assert_eq!(node.to_url(), "redis://example.com:7001");
4597 }
4598
4599 #[test]
4600 fn test_to_url_with_trailing_whitespace() {
4601 let node = ClusterNode {
4602 host: " rediss://secure.example.com ".to_string(),
4603 port: 6379,
4604 };
4605 assert_eq!(node.to_url(), "rediss://secure.example.com:6379");
4606 }
4607
4608 #[test]
4609 fn test_to_url_custom_port() {
4610 let node = ClusterNode {
4611 host: "redis-cluster.example.com".to_string(),
4612 port: 7000,
4613 };
4614 assert_eq!(node.to_url(), "redis://redis-cluster.example.com:7000");
4615 }
4616
4617 #[test]
4618 fn test_to_url_plain_host_with_port_in_host_field() {
4619 let node = ClusterNode {
4620 host: "redis-cluster.example.com:7010".to_string(),
4621 port: 7000,
4622 };
4623 assert_eq!(node.to_url(), "redis://redis-cluster.example.com:7010");
4624 }
4625
4626 #[test]
4627 fn test_to_url_with_options_adds_auth_and_tls() {
4628 let node = ClusterNode {
4629 host: "node.example.com".to_string(),
4630 port: 7000,
4631 };
4632 assert_eq!(
4633 node.to_url_with_options(true, Some("svc-user"), Some("secret")),
4634 "rediss://svc-user:secret@node.example.com:7000"
4635 );
4636 }
4637
4638 #[test]
4639 fn test_to_url_with_options_keeps_embedded_auth() {
4640 let node = ClusterNode {
4641 host: "rediss://:node-secret@node.example.com:7000".to_string(),
4642 port: 7000,
4643 };
4644 assert_eq!(
4645 node.to_url_with_options(true, Some("global-user"), Some("global-secret")),
4646 "rediss://:node-secret@node.example.com:7000"
4647 );
4648 }
4649
4650 #[test]
4651 fn test_from_seed_parses_plain_host_port() {
4652 let node = ClusterNode::from_seed("cluster-node-1:7005").expect("node should parse");
4653 assert_eq!(node.host, "cluster-node-1");
4654 assert_eq!(node.port, 7005);
4655 }
4656
4657 #[test]
4658 fn test_from_seed_keeps_scheme_urls() {
4659 let node =
4660 ClusterNode::from_seed("rediss://secure.example.com:7005").expect("node should parse");
4661 assert_eq!(node.host, "rediss://secure.example.com:7005");
4662 assert_eq!(node.port, 7005);
4663 }
4664
4665 #[test]
4666 fn test_to_url_aws_elasticache_hostname() {
4667 let node = ClusterNode {
4668 host: "rediss://my-cluster.use1.cache.amazonaws.com".to_string(),
4669 port: 6379,
4670 };
4671 assert_eq!(
4672 node.to_url(),
4673 "rediss://my-cluster.use1.cache.amazonaws.com:6379"
4674 );
4675 }
4676
4677 #[test]
4678 fn test_to_url_with_ipv6_no_port() {
4679 let node = ClusterNode {
4680 host: "rediss://[::1]".to_string(),
4681 port: 6379,
4682 };
4683 assert_eq!(node.to_url(), "rediss://[::1]:6379");
4684 }
4685
4686 #[test]
4687 fn test_to_url_with_ipv6_and_port_in_url() {
4688 let node = ClusterNode {
4689 host: "rediss://[::1]:7000".to_string(),
4690 port: 6379,
4691 };
4692 assert_eq!(node.to_url(), "rediss://[::1]:7000");
4693 }
4694
4695 #[test]
4696 fn test_to_url_with_ipv6_full_address_no_port() {
4697 let node = ClusterNode {
4698 host: "rediss://[2001:db8::1]".to_string(),
4699 port: 6379,
4700 };
4701 assert_eq!(node.to_url(), "rediss://[2001:db8::1]:6379");
4702 }
4703
4704 #[test]
4705 fn test_to_url_with_ipv6_full_address_with_port() {
4706 let node = ClusterNode {
4707 host: "rediss://[2001:db8::1]:7000".to_string(),
4708 port: 6379,
4709 };
4710 assert_eq!(node.to_url(), "rediss://[2001:db8::1]:7000");
4711 }
4712
4713 #[test]
4714 fn test_to_url_with_redis_protocol_ipv6() {
4715 let node = ClusterNode {
4716 host: "redis://[::1]".to_string(),
4717 port: 6379,
4718 };
4719 assert_eq!(node.to_url(), "redis://[::1]:6379");
4720 }
4721}
4722
4723#[cfg(test)]
4724mod cors_config_tests {
4725 use super::CorsConfig;
4726
4727 fn cors_from_json(json: &str) -> sonic_rs::Result<CorsConfig> {
4728 sonic_rs::from_str(json)
4729 }
4730
4731 #[test]
4732 fn test_deserialize_valid_exact_origins() {
4733 let config =
4734 cors_from_json(r#"{"origin": ["https://example.com", "https://other.com"]}"#).unwrap();
4735 assert_eq!(config.origin.len(), 2);
4736 }
4737
4738 #[test]
4739 fn test_deserialize_valid_wildcard_patterns() {
4740 let config =
4741 cors_from_json(r#"{"origin": ["*.example.com", "https://*.staging.example.com"]}"#)
4742 .unwrap();
4743 assert_eq!(config.origin.len(), 2);
4744 }
4745
4746 #[test]
4747 fn test_deserialize_allows_special_markers() {
4748 assert!(cors_from_json(r#"{"origin": ["*"]}"#).is_ok());
4749 assert!(cors_from_json(r#"{"origin": ["Any"]}"#).is_ok());
4750 assert!(cors_from_json(r#"{"origin": ["any"]}"#).is_ok());
4751 assert!(cors_from_json(r#"{"origin": ["*", "https://example.com"]}"#).is_ok());
4752 }
4753
4754 #[test]
4755 fn test_deserialize_rejects_invalid_patterns() {
4756 assert!(cors_from_json(r#"{"origin": ["*.*bad"]}"#).is_err());
4757 assert!(cors_from_json(r#"{"origin": ["*."]}"#).is_err());
4758 assert!(cors_from_json(r#"{"origin": [""]}"#).is_err());
4759 assert!(cors_from_json(r#"{"origin": ["https://"]}"#).is_err());
4760 }
4761
4762 #[test]
4763 fn test_deserialize_rejects_mixed_valid_and_invalid() {
4764 assert!(cors_from_json(r#"{"origin": ["https://good.com", "*.*bad"]}"#).is_err());
4765 }
4766}