1use crate::app::App;
2use crate::utils::{parse_bool_env, parse_env, parse_env_optional};
3use ahash::AHashMap;
4use serde::{Deserialize, Serialize};
5use std::str::FromStr;
6use tracing::{info, warn};
7use url::Url;
8
9fn deserialize_octal_permission<'de, D>(deserializer: D) -> Result<u32, D::Error>
11where
12 D: serde::Deserializer<'de>,
13{
14 use serde::de::{self};
15
16 let s = String::deserialize(deserializer)?;
17
18 if !s.chars().all(|c| c.is_digit(8)) {
20 return Err(de::Error::custom(format!(
21 "invalid octal permission mode '{}': must contain only digits 0-7",
22 s
23 )));
24 }
25
26 let mode = u32::from_str_radix(&s, 8)
28 .map_err(|_| de::Error::custom(format!("invalid octal permission mode: {}", s)))?;
29
30 if mode > 0o777 {
32 return Err(de::Error::custom(format!(
33 "permission mode '{}' exceeds maximum value 777",
34 s
35 )));
36 }
37
38 Ok(mode)
39}
40
41fn parse_driver_enum<T: FromStr + Clone + std::fmt::Debug>(
43 driver_str: String,
44 default_driver: T,
45 driver_name: &str,
46) -> T
47where
48 <T as FromStr>::Err: std::fmt::Debug,
49{
50 match T::from_str(&driver_str.to_lowercase()) {
51 Ok(driver_enum) => driver_enum,
52 Err(e) => {
53 warn!(
54 "Failed to parse {} driver from string '{}': {:?}. Using default: {:?}.",
55 driver_name, driver_str, e, default_driver
56 );
57 default_driver
58 }
59 }
60}
61
62fn override_db_pool_settings(db_conn: &mut DatabaseConnection, prefix: &str) {
63 if let Some(min) = parse_env_optional::<u32>(&format!("{}_POOL_MIN", prefix)) {
64 db_conn.pool_min = Some(min);
65 }
66 if let Some(max) = parse_env_optional::<u32>(&format!("{}_POOL_MAX", prefix)) {
67 db_conn.pool_max = Some(max);
68 }
69}
70
71#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
74#[serde(rename_all = "lowercase")]
75pub enum AdapterDriver {
76 #[default]
77 Local,
78 Redis,
79 #[serde(rename = "redis-cluster")]
80 RedisCluster,
81 Nats,
82 Pulsar,
83 RabbitMq,
84 #[serde(rename = "google-pubsub")]
85 GooglePubSub,
86 Kafka,
87 Iggy,
88}
89
90#[derive(Debug, Clone, Serialize, Deserialize)]
91#[serde(default)]
92pub struct DynamoDbSettings {
93 pub region: String,
94 pub table_name: String,
95 pub endpoint_url: Option<String>,
96 pub aws_access_key_id: Option<String>,
97 pub aws_secret_access_key: Option<String>,
98 pub aws_profile_name: Option<String>,
99}
100
101impl Default for DynamoDbSettings {
102 fn default() -> Self {
103 Self {
104 region: "us-east-1".to_string(),
105 table_name: "sockudo-applications".to_string(),
106 endpoint_url: None,
107 aws_access_key_id: None,
108 aws_secret_access_key: None,
109 aws_profile_name: None,
110 }
111 }
112}
113
114#[derive(Debug, Clone, Serialize, Deserialize)]
115#[serde(default)]
116pub struct ScyllaDbSettings {
117 pub nodes: Vec<String>,
118 pub keyspace: String,
119 pub table_name: String,
120 pub username: Option<String>,
121 pub password: Option<String>,
122 pub replication_class: String,
123 pub replication_factor: u32,
124}
125
126impl Default for ScyllaDbSettings {
127 fn default() -> Self {
128 Self {
129 nodes: vec!["127.0.0.1:9042".to_string()],
130 keyspace: "sockudo".to_string(),
131 table_name: "applications".to_string(),
132 username: None,
133 password: None,
134 replication_class: "SimpleStrategy".to_string(),
135 replication_factor: 3,
136 }
137 }
138}
139
140#[derive(Debug, Clone, Serialize, Deserialize)]
141#[serde(default)]
142pub struct SurrealDbSettings {
143 pub url: String,
144 pub namespace: String,
145 pub database: String,
146 pub username: String,
147 pub password: String,
148 pub table_name: String,
149 pub cache_ttl: u64,
150 pub cache_max_capacity: u64,
151}
152
153impl Default for SurrealDbSettings {
154 fn default() -> Self {
155 Self {
156 url: "ws://127.0.0.1:8000".to_string(),
157 namespace: "sockudo".to_string(),
158 database: "sockudo".to_string(),
159 username: "root".to_string(),
160 password: "root".to_string(),
161 table_name: "applications".to_string(),
162 cache_ttl: 300,
163 cache_max_capacity: 100,
164 }
165 }
166}
167
168impl FromStr for AdapterDriver {
169 type Err = String;
170 fn from_str(s: &str) -> Result<Self, Self::Err> {
171 match s.to_lowercase().as_str() {
172 "local" => Ok(AdapterDriver::Local),
173 "redis" => Ok(AdapterDriver::Redis),
174 "redis-cluster" => Ok(AdapterDriver::RedisCluster),
175 "nats" => Ok(AdapterDriver::Nats),
176 "pulsar" => Ok(AdapterDriver::Pulsar),
177 "rabbitmq" | "rabbit-mq" => Ok(AdapterDriver::RabbitMq),
178 "google-pubsub" | "gcp-pubsub" | "pubsub" => Ok(AdapterDriver::GooglePubSub),
179 "kafka" => Ok(AdapterDriver::Kafka),
180 "iggy" | "apache-iggy" | "apache_iggy" => Ok(AdapterDriver::Iggy),
181 _ => Err(format!("Unknown adapter driver: {s}")),
182 }
183 }
184}
185
186#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
187#[serde(rename_all = "lowercase")]
188pub enum AppManagerDriver {
189 #[default]
190 Memory,
191 Mysql,
192 Dynamodb,
193 PgSql,
194 SurrealDb,
195 ScyllaDb,
196}
197impl FromStr for AppManagerDriver {
198 type Err = String;
199 fn from_str(s: &str) -> Result<Self, Self::Err> {
200 match s.to_lowercase().as_str() {
201 "memory" => Ok(AppManagerDriver::Memory),
202 "mysql" => Ok(AppManagerDriver::Mysql),
203 "dynamodb" => Ok(AppManagerDriver::Dynamodb),
204 "pgsql" | "postgres" | "postgresql" => Ok(AppManagerDriver::PgSql),
205 "surreal" | "surrealdb" => Ok(AppManagerDriver::SurrealDb),
206 "scylladb" | "scylla" => Ok(AppManagerDriver::ScyllaDb),
207 _ => Err(format!("Unknown app manager driver: {s}")),
208 }
209 }
210}
211
212#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
213#[serde(rename_all = "lowercase")]
214pub enum CacheDriver {
215 #[default]
216 Memory,
217 Redis,
218 #[serde(rename = "redis-cluster")]
219 RedisCluster,
220 None,
221}
222
223impl FromStr for CacheDriver {
224 type Err = String;
225 fn from_str(s: &str) -> Result<Self, Self::Err> {
226 match s.to_lowercase().as_str() {
227 "memory" => Ok(CacheDriver::Memory),
228 "redis" => Ok(CacheDriver::Redis),
229 "redis-cluster" => Ok(CacheDriver::RedisCluster),
230 "none" => Ok(CacheDriver::None),
231 _ => Err(format!("Unknown cache driver: {s}")),
232 }
233 }
234}
235
236#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
237#[serde(rename_all = "lowercase")]
238pub enum QueueDriver {
239 Memory,
240 #[default]
241 Redis,
242 #[serde(rename = "redis-cluster")]
243 RedisCluster,
244 Nats,
245 Pulsar,
246 RabbitMq,
247 #[serde(rename = "google-pubsub")]
248 GooglePubSub,
249 Kafka,
250 Iggy,
251 Sqs,
252 Sns,
253 None,
254}
255
256impl FromStr for QueueDriver {
257 type Err = String;
258 fn from_str(s: &str) -> Result<Self, Self::Err> {
259 match s.to_lowercase().as_str() {
260 "memory" => Ok(QueueDriver::Memory),
261 "redis" => Ok(QueueDriver::Redis),
262 "redis-cluster" => Ok(QueueDriver::RedisCluster),
263 "nats" => Ok(QueueDriver::Nats),
264 "pulsar" => Ok(QueueDriver::Pulsar),
265 "rabbitmq" | "rabbit-mq" => Ok(QueueDriver::RabbitMq),
266 "google-pubsub" | "gcp-pubsub" | "pubsub" => Ok(QueueDriver::GooglePubSub),
267 "kafka" => Ok(QueueDriver::Kafka),
268 "iggy" | "apache-iggy" | "apache_iggy" => Ok(QueueDriver::Iggy),
269 "sqs" => Ok(QueueDriver::Sqs),
270 "sns" => Ok(QueueDriver::Sns),
271 "none" => Ok(QueueDriver::None),
272 _ => Err(format!("Unknown queue driver: {s}")),
273 }
274 }
275}
276
277#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
278#[serde(rename_all = "snake_case")]
279pub enum DeltaCoordinationBackend {
280 #[default]
281 Auto,
282 None,
283 Redis,
284 RedisCluster,
285 Nats,
286}
287
288impl FromStr for DeltaCoordinationBackend {
289 type Err = String;
290
291 fn from_str(s: &str) -> Result<Self, Self::Err> {
292 match s.trim().to_ascii_lowercase().as_str() {
293 "auto" => Ok(Self::Auto),
294 "none" => Ok(Self::None),
295 "redis" => Ok(Self::Redis),
296 "redis-cluster" | "redis_cluster" => Ok(Self::RedisCluster),
297 "nats" => Ok(Self::Nats),
298 _ => Err(format!("Unknown delta coordination backend: {s}")),
299 }
300 }
301}
302
303impl AsRef<str> for QueueDriver {
304 fn as_ref(&self) -> &str {
305 match self {
306 QueueDriver::Memory => "memory",
307 QueueDriver::Redis => "redis",
308 QueueDriver::RedisCluster => "redis-cluster",
309 QueueDriver::Nats => "nats",
310 QueueDriver::Pulsar => "pulsar",
311 QueueDriver::RabbitMq => "rabbitmq",
312 QueueDriver::GooglePubSub => "google-pubsub",
313 QueueDriver::Kafka => "kafka",
314 QueueDriver::Iggy => "iggy",
315 QueueDriver::Sqs => "sqs",
316 QueueDriver::Sns => "sns",
317 QueueDriver::None => "none",
318 }
319 }
320}
321
322#[derive(Debug, Clone, Serialize, Deserialize)]
323#[serde(default)]
324pub struct RedisClusterQueueConfig {
325 pub concurrency: u32,
326 pub prefix: Option<String>,
327 pub nodes: Vec<String>,
328 pub request_timeout_ms: u64,
329}
330
331impl Default for RedisClusterQueueConfig {
332 fn default() -> Self {
333 Self {
334 concurrency: 5,
335 prefix: Some("sockudo_queue:".to_string()),
336 nodes: vec!["redis://127.0.0.1:6379".to_string()],
337 request_timeout_ms: 5000,
338 }
339 }
340}
341
342#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
343#[serde(rename_all = "lowercase")]
344pub enum MetricsDriver {
345 #[default]
346 Prometheus,
347}
348
349#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
350#[serde(rename_all = "lowercase")]
351pub enum LogOutputFormat {
352 #[default]
353 Human,
354 Json,
355}
356
357impl FromStr for LogOutputFormat {
358 type Err = String;
359 fn from_str(s: &str) -> Result<Self, Self::Err> {
360 match s.to_lowercase().as_str() {
361 "human" => Ok(LogOutputFormat::Human),
362 "json" => Ok(LogOutputFormat::Json),
363 _ => Err(format!("Unknown log output format: {s}")),
364 }
365 }
366}
367
368impl FromStr for MetricsDriver {
369 type Err = String;
370 fn from_str(s: &str) -> Result<Self, Self::Err> {
371 match s.to_lowercase().as_str() {
372 "prometheus" => Ok(MetricsDriver::Prometheus),
373 _ => Err(format!("Unknown metrics driver: {s}")),
374 }
375 }
376}
377
378impl AsRef<str> for MetricsDriver {
379 fn as_ref(&self) -> &str {
380 match self {
381 MetricsDriver::Prometheus => "prometheus",
382 }
383 }
384}
385
386#[derive(Debug, Clone, Serialize, Deserialize)]
388#[serde(default)]
389pub struct ServerOptions {
390 pub adapter: AdapterConfig,
391 pub app_manager: AppManagerConfig,
392 pub cache: CacheConfig,
393 pub channel_limits: ChannelLimits,
394 pub cors: CorsConfig,
395 pub database: DatabaseConfig,
396 pub database_pooling: DatabasePooling,
397 pub debug: bool,
398 pub event_limits: EventLimits,
399 pub host: String,
400 pub http_api: HttpApiConfig,
401 pub instance: InstanceConfig,
402 pub logging: Option<LoggingConfig>,
403 pub metrics: MetricsConfig,
404 pub mode: String,
405 pub port: u16,
406 pub path_prefix: String,
407 pub presence: PresenceConfig,
408 pub queue: QueueConfig,
409 pub rate_limiter: RateLimiterConfig,
410 pub shutdown_grace_period: u64,
411 pub ssl: SslConfig,
412 pub user_authentication_timeout: u64,
413 pub webhooks: WebhooksConfig,
414 pub websocket_max_payload_kb: u32,
415 pub cleanup: CleanupConfig,
416 pub activity_timeout: u64,
417 pub cluster_health: ClusterHealthConfig,
418 pub unix_socket: UnixSocketConfig,
419 pub delta_compression: DeltaCompressionOptionsConfig,
420 pub tag_filtering: TagFilteringConfig,
421 pub websocket: WebSocketConfig,
422 pub connection_recovery: ConnectionRecoveryConfig,
423 pub history: HistoryConfig,
424 pub presence_history: PresenceHistoryConfig,
425 pub idempotency: IdempotencyConfig,
426 pub ephemeral: EphemeralConfig,
427 pub echo_control: EchoControlConfig,
428 pub event_name_filtering: EventNameFilteringConfig,
429 pub versioned_messages: VersionedMessagesConfig,
430 pub annotations: AnnotationsConfig,
431 pub push: PushConfig,
432 pub health_check_timeout_ms: u64,
435}
436
437#[derive(Debug, Clone, Serialize, Deserialize)]
440#[serde(default)]
441pub struct SqsQueueConfig {
442 pub region: String,
443 pub queue_url_prefix: Option<String>,
444 pub visibility_timeout: i32,
445 pub endpoint_url: Option<String>,
446 pub max_messages: i32,
447 pub wait_time_seconds: i32,
448 pub concurrency: u32,
449 pub fifo: bool,
450 pub message_group_id: Option<String>,
451}
452
453#[derive(Debug, Clone, Serialize, Deserialize)]
454#[serde(default)]
455pub struct SnsQueueConfig {
456 pub region: String,
457 pub topic_arn: String,
458 pub endpoint_url: Option<String>,
459}
460
461#[derive(Debug, Clone, Serialize, Deserialize)]
462#[serde(default)]
463pub struct AdapterConfig {
464 pub driver: AdapterDriver,
465 pub redis: RedisAdapterConfig,
466 pub cluster: RedisClusterAdapterConfig,
467 pub nats: NatsAdapterConfig,
468 pub pulsar: PulsarAdapterConfig,
469 pub rabbitmq: RabbitMqAdapterConfig,
470 pub google_pubsub: GooglePubSubAdapterConfig,
471 pub kafka: KafkaAdapterConfig,
472 pub iggy: IggyConfig,
473 #[serde(default = "default_buffer_multiplier_per_cpu")]
474 pub buffer_multiplier_per_cpu: usize,
475 pub cluster_health: ClusterHealthConfig,
476 #[serde(default = "default_enable_socket_counting")]
477 pub enable_socket_counting: bool,
478 #[serde(default = "default_fallback_to_local")]
479 pub fallback_to_local: bool,
480 #[serde(default = "default_aggregate_counts")]
484 pub aggregate_counts: bool,
485}
486
487fn default_aggregate_counts() -> bool {
488 false
489}
490
491fn default_enable_socket_counting() -> bool {
492 true
493}
494
495fn default_fallback_to_local() -> bool {
496 true
497}
498
499fn default_buffer_multiplier_per_cpu() -> usize {
500 64
501}
502
503impl Default for AdapterConfig {
504 fn default() -> Self {
505 Self {
506 driver: AdapterDriver::default(),
507 redis: RedisAdapterConfig::default(),
508 cluster: RedisClusterAdapterConfig::default(),
509 nats: NatsAdapterConfig::default(),
510 pulsar: PulsarAdapterConfig::default(),
511 rabbitmq: RabbitMqAdapterConfig::default(),
512 google_pubsub: GooglePubSubAdapterConfig::default(),
513 kafka: KafkaAdapterConfig::default(),
514 iggy: IggyConfig::default(),
515 buffer_multiplier_per_cpu: default_buffer_multiplier_per_cpu(),
516 cluster_health: ClusterHealthConfig::default(),
517 enable_socket_counting: default_enable_socket_counting(),
518 fallback_to_local: default_fallback_to_local(),
519 aggregate_counts: default_aggregate_counts(),
520 }
521 }
522}
523
524#[derive(Debug, Clone, Serialize, Deserialize)]
525#[serde(default)]
526pub struct RedisAdapterConfig {
527 pub requests_timeout: u64,
528 pub prefix: String,
529 pub redis_pub_options: AHashMap<String, sonic_rs::Value>,
530 pub redis_sub_options: AHashMap<String, sonic_rs::Value>,
531 pub cluster_mode: bool,
532}
533
534#[derive(Debug, Clone, Serialize, Deserialize)]
535#[serde(default)]
536pub struct RedisClusterAdapterConfig {
537 pub nodes: Vec<String>,
538 pub prefix: String,
539 pub request_timeout_ms: u64,
540 pub use_connection_manager: bool,
541 #[serde(default)]
542 pub use_sharded_pubsub: bool,
543}
544
545#[derive(Debug, Clone, Serialize, Deserialize)]
546#[serde(default)]
547pub struct NatsAdapterConfig {
548 pub servers: Vec<String>,
549 pub prefix: String,
550 pub request_timeout_ms: u64,
551 pub username: Option<String>,
552 pub password: Option<String>,
553 pub token: Option<String>,
554 pub connection_timeout_ms: u64,
555 pub nodes_number: Option<u32>,
556 pub discovery_max_wait_ms: u64,
557 pub discovery_idle_wait_ms: u64,
558 pub subscription_capacity: Option<usize>,
559 pub client_capacity: Option<usize>,
560 pub max_reconnects: Option<usize>,
561 pub presence_sync_chunk_size: Option<usize>,
562}
563
564#[derive(Debug, Clone, Serialize, Deserialize)]
565#[serde(default)]
566pub struct PulsarAdapterConfig {
567 pub url: String,
568 pub prefix: String,
569 pub request_timeout_ms: u64,
570 pub token: Option<String>,
571 pub nodes_number: Option<u32>,
572}
573
574#[derive(Debug, Clone, Serialize, Deserialize)]
575#[serde(default)]
576pub struct RabbitMqAdapterConfig {
577 pub url: String,
578 pub prefix: String,
579 pub request_timeout_ms: u64,
580 pub connection_timeout_ms: u64,
581 pub nodes_number: Option<u32>,
582}
583
584#[derive(Debug, Clone, Serialize, Deserialize)]
585#[serde(default)]
586pub struct GooglePubSubAdapterConfig {
587 pub project_id: String,
588 pub prefix: String,
589 pub request_timeout_ms: u64,
590 pub emulator_host: Option<String>,
591 pub nodes_number: Option<u32>,
592}
593
594#[derive(Debug, Clone, Serialize, Deserialize)]
595#[serde(default)]
596pub struct KafkaAdapterConfig {
597 pub brokers: Vec<String>,
598 pub prefix: String,
599 pub request_timeout_ms: u64,
600 pub security_protocol: Option<String>,
601 pub sasl_mechanism: Option<String>,
602 pub sasl_username: Option<String>,
603 pub sasl_password: Option<String>,
604 pub nodes_number: Option<u32>,
605}
606
607#[derive(Debug, Clone, Serialize, Deserialize)]
608#[serde(default)]
609pub struct IggyConfig {
610 pub connection_string: String,
611 pub username: Option<String>,
612 pub password: Option<String>,
613 pub consumer_name: Option<String>,
614 pub stream: String,
615 pub topic_prefix: String,
616 pub queue_topic_prefix: String,
617 pub consumer_group_prefix: String,
618 pub request_timeout_ms: u64,
619 pub poll_interval_ms: u64,
620 pub poll_batch_size: u32,
621 pub partitions_count: u32,
622 pub partition_id: u32,
623 pub auto_create: bool,
624 pub start_from_latest: bool,
625 pub nodes_number: Option<u32>,
626}
627
628#[derive(Debug, Clone, Serialize, Deserialize, Default)]
629#[serde(default)]
630pub struct AppManagerConfig {
631 pub driver: AppManagerDriver,
632 pub array: ArrayConfig,
633 pub cache: CacheSettings,
634 pub scylladb: ScyllaDbSettings,
635}
636
637#[derive(Debug, Clone, Serialize, Deserialize, Default)]
638#[serde(default)]
639pub struct ArrayConfig {
640 pub apps: Vec<App>,
641}
642
643#[derive(Debug, Clone, Serialize, Deserialize)]
644#[serde(default)]
645pub struct CacheSettings {
646 pub enabled: bool,
647 pub ttl: u64,
648}
649
650#[derive(Debug, Clone, Serialize, Deserialize)]
651#[serde(default)]
652pub struct MemoryCacheOptions {
653 pub ttl: u64,
654 pub cleanup_interval: u64,
655 pub max_capacity: u64,
656}
657
658#[derive(Debug, Clone, Serialize, Deserialize)]
659#[serde(default)]
660pub struct CacheConfig {
661 pub driver: CacheDriver,
662 pub redis: RedisConfig,
663 pub memory: MemoryCacheOptions,
664}
665
666#[derive(Debug, Clone, Serialize, Deserialize, Default)]
667#[serde(default)]
668pub struct RedisConfig {
669 pub prefix: Option<String>,
670 pub url_override: Option<String>,
671 pub cluster_mode: bool,
672}
673
674#[derive(Debug, Clone, Serialize, Deserialize)]
675#[serde(default)]
676pub struct ChannelLimits {
677 pub max_name_length: u32,
678 pub cache_ttl: u64,
679}
680
681#[derive(Debug, Clone, Serialize, Deserialize)]
682#[serde(default)]
683pub struct CorsConfig {
684 pub credentials: bool,
685 #[serde(deserialize_with = "deserialize_and_validate_cors_origins")]
686 pub origin: Vec<String>,
687 pub methods: Vec<String>,
688 pub allowed_headers: Vec<String>,
689}
690
691fn deserialize_and_validate_cors_origins<'de, D>(deserializer: D) -> Result<Vec<String>, D::Error>
692where
693 D: serde::Deserializer<'de>,
694{
695 use serde::de::Error;
696 let origins = Vec::<String>::deserialize(deserializer)?;
697
698 if let Err(e) = crate::origin_validation::OriginValidator::validate_patterns(&origins) {
699 return Err(D::Error::custom(format!(
700 "CORS origin pattern validation failed: {}",
701 e
702 )));
703 }
704
705 Ok(origins)
706}
707
708#[derive(Debug, Clone, Serialize, Deserialize, Default)]
709#[serde(default)]
710pub struct DatabaseConfig {
711 pub mysql: DatabaseConnection,
712 pub postgres: DatabaseConnection,
713 pub redis: RedisConnection,
714 pub dynamodb: DynamoDbSettings,
715 pub surrealdb: SurrealDbSettings,
716 pub scylladb: ScyllaDbSettings,
717}
718
719#[derive(Debug, Clone, Serialize, Deserialize)]
720#[serde(default)]
721pub struct DatabaseConnection {
722 pub host: String,
723 pub port: u16,
724 pub username: String,
725 pub password: String,
726 pub database: String,
727 pub table_name: String,
728 pub connection_pool_size: u32,
729 pub pool_min: Option<u32>,
730 pub pool_max: Option<u32>,
731 pub cache_ttl: u64,
732 pub cache_cleanup_interval: u64,
733 pub cache_max_capacity: u64,
734}
735
736#[derive(Debug, Clone, Serialize, Deserialize)]
737#[serde(default)]
738pub struct RedisConnection {
739 pub host: String,
740 pub port: u16,
741 pub db: u32,
742 pub username: Option<String>,
743 pub password: Option<String>,
744 pub key_prefix: String,
745 pub sentinels: Vec<RedisSentinel>,
746 pub sentinel_password: Option<String>,
747 pub name: String,
748 pub cluster: RedisClusterConnection,
749 pub cluster_nodes: Vec<ClusterNode>,
751}
752
753#[derive(Debug, Clone, Serialize, Deserialize)]
754#[serde(default)]
755pub struct RedisSentinel {
756 pub host: String,
757 pub port: u16,
758}
759
760#[derive(Debug, Clone, Serialize, Deserialize, Default)]
761#[serde(default)]
762pub struct RedisClusterConnection {
763 pub nodes: Vec<ClusterNode>,
764 pub username: Option<String>,
765 pub password: Option<String>,
766 #[serde(alias = "useTLS")]
767 pub use_tls: bool,
768}
769
770impl RedisConnection {
771 pub fn is_sentinel_configured(&self) -> bool {
773 !self.sentinels.is_empty()
774 }
775
776 pub fn to_url(&self) -> String {
778 if self.is_sentinel_configured() {
779 self.build_sentinel_url()
780 } else {
781 self.build_standard_url()
782 }
783 }
784
785 fn build_standard_url(&self) -> String {
786 let (scheme, host) = if self.host.starts_with("rediss://") {
788 ("rediss://", self.host.trim_start_matches("rediss://"))
789 } else if self.host.starts_with("redis://") {
790 ("redis://", self.host.trim_start_matches("redis://"))
791 } else {
792 ("redis://", self.host.as_str())
793 };
794
795 let mut url = String::from(scheme);
796
797 if let Some(ref username) = self.username {
798 url.push_str(username);
799 if let Some(ref password) = self.password {
800 url.push(':');
801 url.push_str(&urlencoding::encode(password));
802 }
803 url.push('@');
804 } else if let Some(ref password) = self.password {
805 url.push(':');
806 url.push_str(&urlencoding::encode(password));
807 url.push('@');
808 }
809
810 url.push_str(host);
811 url.push(':');
812 url.push_str(&self.port.to_string());
813 url.push('/');
814 url.push_str(&self.db.to_string());
815
816 url
817 }
818
819 fn build_sentinel_url(&self) -> String {
820 let mut url = String::from("redis+sentinel://");
821
822 if let Some(ref sentinel_password) = self.sentinel_password {
823 url.push(':');
824 url.push_str(&urlencoding::encode(sentinel_password));
825 url.push('@');
826 }
827
828 let sentinel_hosts: Vec<String> = self
829 .sentinels
830 .iter()
831 .map(|s| format!("{}:{}", s.host, s.port))
832 .collect();
833 url.push_str(&sentinel_hosts.join(","));
834
835 url.push('/');
836 url.push_str(&self.name);
837 url.push('/');
838 url.push_str(&self.db.to_string());
839
840 let mut params = Vec::new();
841 if let Some(ref password) = self.password {
842 params.push(format!("password={}", urlencoding::encode(password)));
843 }
844 if let Some(ref username) = self.username {
845 params.push(format!("username={}", urlencoding::encode(username)));
846 }
847
848 if !params.is_empty() {
849 url.push('?');
850 url.push_str(¶ms.join("&"));
851 }
852
853 url
854 }
855
856 pub fn has_cluster_nodes(&self) -> bool {
859 !self.cluster.nodes.is_empty() || !self.cluster_nodes.is_empty()
860 }
861
862 pub fn cluster_node_urls(&self) -> Vec<String> {
865 if !self.cluster.nodes.is_empty() {
866 return self.build_cluster_urls(&self.cluster.nodes);
867 }
868 self.build_cluster_urls(&self.cluster_nodes)
869 }
870
871 pub fn normalize_cluster_seed_urls(&self, seeds: &[String]) -> Vec<String> {
874 self.build_cluster_urls(
875 &seeds
876 .iter()
877 .filter_map(|seed| ClusterNode::from_seed(seed))
878 .collect::<Vec<ClusterNode>>(),
879 )
880 }
881
882 fn build_cluster_urls(&self, nodes: &[ClusterNode]) -> Vec<String> {
883 let username = self
884 .cluster
885 .username
886 .as_deref()
887 .or(self.username.as_deref());
888 let password = self
889 .cluster
890 .password
891 .as_deref()
892 .or(self.password.as_deref());
893 let use_tls = self.cluster.use_tls;
894
895 nodes
896 .iter()
897 .map(|node| node.to_url_with_options(use_tls, username, password))
898 .collect()
899 }
900}
901
902impl RedisSentinel {
903 pub fn to_host_port(&self) -> String {
904 format!("{}:{}", self.host, self.port)
905 }
906}
907
908#[derive(Debug, Clone, Serialize, Deserialize)]
909#[serde(default)]
910pub struct ClusterNode {
911 pub host: String,
912 pub port: u16,
913}
914
915impl ClusterNode {
916 pub fn to_url(&self) -> String {
917 self.to_url_with_options(false, None, None)
918 }
919
920 pub fn to_url_with_options(
921 &self,
922 use_tls: bool,
923 username: Option<&str>,
924 password: Option<&str>,
925 ) -> String {
926 let host = self.host.trim();
927
928 if host.starts_with("redis://") || host.starts_with("rediss://") {
929 if let Ok(parsed) = Url::parse(host)
930 && let Some(host_str) = parsed.host_str()
931 {
932 let scheme = parsed.scheme();
933 let port = parsed.port_or_known_default().unwrap_or(self.port);
934 let parsed_username = (!parsed.username().is_empty()).then_some(parsed.username());
935 let parsed_password = parsed.password();
936 let has_embedded_auth = parsed_username.is_some() || parsed_password.is_some();
937 let (effective_username, effective_password) = if has_embedded_auth {
938 (parsed_username, parsed_password)
939 } else {
940 (username, password)
941 };
942
943 return build_redis_url(
944 scheme,
945 host_str,
946 port,
947 effective_username,
948 effective_password,
949 );
950 }
951
952 let has_port = if let Some(bracket_pos) = host.rfind(']') {
954 host[bracket_pos..].contains(':')
955 } else {
956 host.split(':').count() >= 3
957 };
958 let base = if has_port {
959 host.to_string()
960 } else {
961 format!("{}:{}", host, self.port)
962 };
963
964 if let Ok(parsed) = Url::parse(&base) {
965 let parsed_username = (!parsed.username().is_empty()).then_some(parsed.username());
966 let parsed_password = parsed.password();
967 if let Some(host_str) = parsed.host_str() {
968 let port = parsed.port_or_known_default().unwrap_or(self.port);
969 let has_embedded_auth = parsed_username.is_some() || parsed_password.is_some();
970 let (effective_username, effective_password) = if has_embedded_auth {
971 (parsed_username, parsed_password)
972 } else {
973 (username, password)
974 };
975 return build_redis_url(
976 parsed.scheme(),
977 host_str,
978 port,
979 effective_username,
980 effective_password,
981 );
982 }
983 }
984 return base;
985 }
986
987 let (normalized_host, normalized_port) = split_plain_host_and_port(host, self.port);
988 let scheme = if use_tls { "rediss" } else { "redis" };
989 build_redis_url(
990 scheme,
991 &normalized_host,
992 normalized_port,
993 username,
994 password,
995 )
996 }
997
998 pub fn from_seed(seed: &str) -> Option<Self> {
999 let trimmed = seed.trim();
1000 if trimmed.is_empty() {
1001 return None;
1002 }
1003
1004 if trimmed.starts_with("redis://") || trimmed.starts_with("rediss://") {
1005 let port = Url::parse(trimmed)
1006 .ok()
1007 .and_then(|parsed| parsed.port_or_known_default())
1008 .unwrap_or(6379);
1009 return Some(Self {
1010 host: trimmed.to_string(),
1011 port,
1012 });
1013 }
1014
1015 let (host, port) = split_plain_host_and_port(trimmed, 6379);
1016 Some(Self { host, port })
1017 }
1018}
1019
1020fn split_plain_host_and_port(raw_host: &str, default_port: u16) -> (String, u16) {
1021 let host = raw_host.trim();
1022
1023 if host.starts_with('[') {
1025 if let Some(end_bracket) = host.find(']') {
1026 let host_part = host[1..end_bracket].to_string();
1027 let remainder = &host[end_bracket + 1..];
1028 if let Some(port_str) = remainder.strip_prefix(':')
1029 && let Ok(port) = port_str.parse::<u16>()
1030 {
1031 return (host_part, port);
1032 }
1033 return (host_part, default_port);
1034 }
1035 return (host.to_string(), default_port);
1036 }
1037
1038 if host.matches(':').count() == 1
1040 && let Some((host_part, port_part)) = host.rsplit_once(':')
1041 && let Ok(port) = port_part.parse::<u16>()
1042 {
1043 return (host_part.to_string(), port);
1044 }
1045
1046 (host.to_string(), default_port)
1047}
1048
1049fn build_redis_url(
1050 scheme: &str,
1051 host: &str,
1052 port: u16,
1053 username: Option<&str>,
1054 password: Option<&str>,
1055) -> String {
1056 let mut url = format!("{scheme}://");
1057
1058 if let Some(user) = username {
1059 url.push_str(&urlencoding::encode(user));
1060 if let Some(pass) = password {
1061 url.push(':');
1062 url.push_str(&urlencoding::encode(pass));
1063 }
1064 url.push('@');
1065 } else if let Some(pass) = password {
1066 url.push(':');
1067 url.push_str(&urlencoding::encode(pass));
1068 url.push('@');
1069 }
1070
1071 if host.contains(':') && !host.starts_with('[') {
1072 url.push('[');
1073 url.push_str(host);
1074 url.push(']');
1075 } else {
1076 url.push_str(host);
1077 }
1078 url.push(':');
1079 url.push_str(&port.to_string());
1080 url
1081}
1082
1083#[derive(Debug, Clone, Serialize, Deserialize)]
1084#[serde(default)]
1085pub struct DatabasePooling {
1086 pub enabled: bool,
1087 pub min: u32,
1088 pub max: u32,
1089}
1090
1091#[derive(Debug, Clone, Serialize, Deserialize)]
1092#[serde(default)]
1093pub struct EventLimits {
1094 pub max_channels_at_once: u32,
1095 pub max_name_length: u32,
1096 pub max_payload_in_kb: u32,
1097 pub max_batch_size: u32,
1098}
1099
1100#[derive(Debug, Clone, Serialize, Deserialize)]
1101#[serde(default)]
1102pub struct IdempotencyConfig {
1103 pub enabled: bool,
1105 pub ttl_seconds: u64,
1107 pub max_key_length: usize,
1109}
1110
1111#[derive(Debug, Clone, Serialize, Deserialize)]
1112#[serde(default)]
1113pub struct EphemeralConfig {
1114 pub enabled: bool,
1116}
1117
1118#[derive(Debug, Clone, Serialize, Deserialize)]
1119#[serde(default)]
1120pub struct EchoControlConfig {
1121 pub enabled: bool,
1123 pub default_echo_messages: bool,
1125}
1126
1127#[derive(Debug, Clone, Serialize, Deserialize)]
1128#[serde(default)]
1129pub struct EventNameFilteringConfig {
1130 pub enabled: bool,
1132 pub max_events_per_filter: usize,
1134 pub max_event_name_length: usize,
1136}
1137
1138#[derive(Debug, Clone, Serialize, Deserialize)]
1139#[serde(default)]
1140pub struct ConnectionRecoveryConfig {
1141 pub enabled: bool,
1146 pub buffer_ttl_seconds: u64,
1148 pub max_buffer_size: usize,
1150}
1151
1152#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
1153#[serde(rename_all = "lowercase")]
1154pub enum VersionStoreDriver {
1155 #[default]
1156 Memory,
1157 Postgres,
1158 Mysql,
1159 DynamoDb,
1160 ScyllaDb,
1161 SurrealDb,
1162}
1163
1164impl FromStr for VersionStoreDriver {
1165 type Err = String;
1166 fn from_str(s: &str) -> Result<Self, Self::Err> {
1167 match s.to_lowercase().as_str() {
1168 "postgres" | "postgresql" | "pgsql" => Ok(Self::Postgres),
1169 "mysql" => Ok(Self::Mysql),
1170 "dynamodb" => Ok(Self::DynamoDb),
1171 "scylladb" | "scylla" => Ok(Self::ScyllaDb),
1172 "surrealdb" | "surreal" => Ok(Self::SurrealDb),
1173 "memory" => Ok(Self::Memory),
1174 _ => Err(format!("Unknown version store driver: {s}")),
1175 }
1176 }
1177}
1178
1179#[derive(Debug, Clone, Serialize, Deserialize)]
1180#[serde(default)]
1181pub struct VersionedMessagesConfig {
1182 pub enabled: bool,
1184 pub driver: VersionStoreDriver,
1186 pub max_page_size: usize,
1188 pub retention_window_seconds: u64,
1196 pub purge_interval_seconds: u64,
1199 pub purge_batch_size: usize,
1203 pub max_purge_per_tick: usize,
1206}
1207
1208#[derive(Debug, Clone, Serialize, Deserialize, Default)]
1209#[serde(default)]
1210pub struct AnnotationsConfig {
1211 pub enabled: bool,
1214}
1215
1216#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
1217#[serde(rename_all = "lowercase")]
1218pub enum HistoryBackend {
1219 #[default]
1220 Postgres,
1221 Mysql,
1222 DynamoDb,
1223 SurrealDb,
1224 ScyllaDb,
1225 Memory,
1226}
1227
1228impl FromStr for HistoryBackend {
1229 type Err = String;
1230 fn from_str(s: &str) -> Result<Self, Self::Err> {
1231 match s.to_lowercase().as_str() {
1232 "postgres" | "postgresql" | "pgsql" => Ok(Self::Postgres),
1233 "mysql" => Ok(Self::Mysql),
1234 "dynamodb" => Ok(Self::DynamoDb),
1235 "surrealdb" | "surreal" => Ok(Self::SurrealDb),
1236 "scylladb" | "scylla" => Ok(Self::ScyllaDb),
1237 "memory" => Ok(Self::Memory),
1238 _ => Err(format!("Unknown history backend: {s}")),
1239 }
1240 }
1241}
1242
1243#[derive(Debug, Clone, Serialize, Deserialize)]
1244#[serde(default)]
1245pub struct PostgresHistoryConfig {
1246 pub table_prefix: String,
1247 pub write_timeout_ms: u64,
1248}
1249
1250impl Default for PostgresHistoryConfig {
1251 fn default() -> Self {
1252 Self {
1253 table_prefix: "sockudo_history".to_string(),
1254 write_timeout_ms: 5000,
1255 }
1256 }
1257}
1258
1259#[derive(Debug, Clone, Serialize, Deserialize)]
1260#[serde(default)]
1261pub struct MySqlHistoryConfig {
1262 pub table_prefix: String,
1263 pub write_timeout_ms: u64,
1264}
1265
1266impl Default for MySqlHistoryConfig {
1267 fn default() -> Self {
1268 Self {
1269 table_prefix: "sockudo_history".to_string(),
1270 write_timeout_ms: 5000,
1271 }
1272 }
1273}
1274
1275#[derive(Debug, Clone, Serialize, Deserialize)]
1276#[serde(default)]
1277pub struct DynamoDbHistoryConfig {
1278 pub table_prefix: String,
1279 pub write_timeout_ms: u64,
1280}
1281
1282impl Default for DynamoDbHistoryConfig {
1283 fn default() -> Self {
1284 Self {
1285 table_prefix: "sockudo_history".to_string(),
1286 write_timeout_ms: 5000,
1287 }
1288 }
1289}
1290
1291#[derive(Debug, Clone, Serialize, Deserialize)]
1292#[serde(default)]
1293pub struct SurrealDbHistoryConfig {
1294 pub table_prefix: String,
1295 pub write_timeout_ms: u64,
1296}
1297
1298impl Default for SurrealDbHistoryConfig {
1299 fn default() -> Self {
1300 Self {
1301 table_prefix: "sockudo_history".to_string(),
1302 write_timeout_ms: 5000,
1303 }
1304 }
1305}
1306
1307#[derive(Debug, Clone, Serialize, Deserialize)]
1308#[serde(default)]
1309pub struct ScyllaDbHistoryConfig {
1310 pub table_prefix: String,
1311 pub write_timeout_ms: u64,
1312}
1313
1314impl Default for ScyllaDbHistoryConfig {
1315 fn default() -> Self {
1316 Self {
1317 table_prefix: "sockudo_history".to_string(),
1318 write_timeout_ms: 5000,
1319 }
1320 }
1321}
1322
1323#[derive(Debug, Clone, Serialize, Deserialize)]
1324#[serde(default)]
1325pub struct HistoryConfig {
1326 pub enabled: bool,
1327 pub rewind_enabled: bool,
1328 pub backend: HistoryBackend,
1329 pub retention_window_seconds: u64,
1330 pub max_page_size: usize,
1331 pub max_messages_per_channel: Option<usize>,
1332 pub max_bytes_per_channel: Option<u64>,
1333 pub writer_shards: usize,
1334 pub writer_queue_capacity: usize,
1335 pub purge_interval_seconds: u64,
1336 pub purge_batch_size: usize,
1337 pub max_purge_per_tick: usize,
1338 pub postgres: PostgresHistoryConfig,
1339 pub mysql: MySqlHistoryConfig,
1340 pub dynamodb: DynamoDbHistoryConfig,
1341 pub surrealdb: SurrealDbHistoryConfig,
1342 pub scylladb: ScyllaDbHistoryConfig,
1343}
1344
1345impl Default for HistoryConfig {
1346 fn default() -> Self {
1347 Self {
1348 enabled: false,
1349 rewind_enabled: true,
1350 backend: HistoryBackend::Postgres,
1351 retention_window_seconds: 86400,
1352 max_page_size: 100,
1353 max_messages_per_channel: None,
1354 max_bytes_per_channel: None,
1355 writer_shards: 16,
1356 writer_queue_capacity: 4096,
1357 purge_interval_seconds: 300,
1358 purge_batch_size: 1000,
1359 max_purge_per_tick: 100_000,
1360 postgres: PostgresHistoryConfig::default(),
1361 mysql: MySqlHistoryConfig::default(),
1362 dynamodb: DynamoDbHistoryConfig::default(),
1363 surrealdb: SurrealDbHistoryConfig::default(),
1364 scylladb: ScyllaDbHistoryConfig::default(),
1365 }
1366 }
1367}
1368
1369impl Default for VersionedMessagesConfig {
1370 fn default() -> Self {
1371 Self {
1372 enabled: false,
1373 driver: VersionStoreDriver::Memory,
1374 max_page_size: 100,
1375 retention_window_seconds: 0,
1376 purge_interval_seconds: 300,
1377 purge_batch_size: 1000,
1378 max_purge_per_tick: 100_000,
1379 }
1380 }
1381}
1382
1383#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
1384#[serde(rename_all = "lowercase")]
1385pub enum PushStorageDriver {
1386 #[default]
1387 Memory,
1388 Postgres,
1389 Mysql,
1390 DynamoDb,
1391 SurrealDb,
1392 ScyllaDb,
1393}
1394
1395impl FromStr for PushStorageDriver {
1396 type Err = String;
1397 fn from_str(s: &str) -> Result<Self, Self::Err> {
1398 match s.to_lowercase().as_str() {
1399 "memory" => Ok(Self::Memory),
1400 "postgres" | "postgresql" | "pgsql" => Ok(Self::Postgres),
1401 "mysql" => Ok(Self::Mysql),
1402 "dynamodb" => Ok(Self::DynamoDb),
1403 "surrealdb" | "surreal" => Ok(Self::SurrealDb),
1404 "scylladb" | "scylla" => Ok(Self::ScyllaDb),
1405 _ => Err(format!("Unknown push storage driver: {s}")),
1406 }
1407 }
1408}
1409
1410#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
1411#[serde(rename_all = "lowercase")]
1412pub enum PushQueueDriver {
1413 #[default]
1414 Memory,
1415 Redis,
1416 #[serde(rename = "redis-cluster")]
1417 RedisCluster,
1418 Nats,
1419 Pulsar,
1420 RabbitMq,
1421 #[serde(rename = "google-pubsub")]
1422 GooglePubsub,
1423 Kafka,
1424 Iggy,
1425 Sqs,
1426 Sns,
1427}
1428
1429impl FromStr for PushQueueDriver {
1430 type Err = String;
1431 fn from_str(s: &str) -> Result<Self, Self::Err> {
1432 match s.to_lowercase().as_str() {
1433 "memory" => Ok(Self::Memory),
1434 "redis" => Ok(Self::Redis),
1435 "redis-cluster" | "redis_cluster" => Ok(Self::RedisCluster),
1436 "nats" => Ok(Self::Nats),
1437 "pulsar" => Ok(Self::Pulsar),
1438 "rabbitmq" | "rabbit-mq" => Ok(Self::RabbitMq),
1439 "google-pubsub" | "google_pubsub" | "gcp-pubsub" | "pubsub" => Ok(Self::GooglePubsub),
1440 "kafka" => Ok(Self::Kafka),
1441 "iggy" | "apache-iggy" | "apache_iggy" => Ok(Self::Iggy),
1442 "sqs" => Ok(Self::Sqs),
1443 "sns" => Ok(Self::Sns),
1444 _ => Err(format!("Unknown push queue driver: {s}")),
1445 }
1446 }
1447}
1448
1449#[derive(Debug, Clone, Serialize, Deserialize)]
1450#[serde(default)]
1451pub struct PushConfig {
1452 pub storage_driver: PushStorageDriver,
1453 pub queue_driver: PushQueueDriver,
1454 pub fcm_enabled: bool,
1455 pub apns_enabled: bool,
1456 pub webpush_enabled: bool,
1457 pub hms_enabled: bool,
1458 pub wns_enabled: bool,
1459 pub fcm_credential_ref: Option<String>,
1460 pub apns_credential_ref: Option<String>,
1461 pub webpush_credential_ref: Option<String>,
1462 pub hms_credential_ref: Option<String>,
1463 pub wns_credential_ref: Option<String>,
1464 pub accept_worker_count: u32,
1465 pub planner_worker_count: u32,
1466 pub shard_worker_count: u32,
1467 pub dispatch_worker_count: u32,
1468 pub feedback_worker_count: u32,
1469 pub queue_partition_count: u32,
1470 pub channel_shard_count: u32,
1471 pub fanout_fast_threshold: u64,
1472 pub fanout_shard_size: u64,
1473 pub fanout_sync_threshold: u64,
1474 pub backpressure_lag_threshold_secs: u64,
1475 pub publish_status_ttl_days: u64,
1476 pub stale_device_max_age_days: u64,
1477 pub retry: PushRetryConfig,
1478 pub circuit_breaker: PushCircuitBreakerConfig,
1479 pub default_quotas: PushDefaultQuotas,
1480 pub credential_encryption_key: Option<String>,
1481 pub kms_key_ref: Option<String>,
1482 pub vault_secret_ref: Option<String>,
1483 pub dry_run: bool,
1484 pub analytics_enabled: bool,
1485 pub analytics_retention_days: u64,
1486 pub payload_redaction: PushPayloadRedactionConfig,
1487 pub scheduler_interval_secs: u64,
1488}
1489
1490impl Default for PushConfig {
1491 fn default() -> Self {
1492 Self {
1493 storage_driver: PushStorageDriver::Memory,
1494 queue_driver: PushQueueDriver::Memory,
1495 fcm_enabled: false,
1496 apns_enabled: false,
1497 webpush_enabled: false,
1498 hms_enabled: false,
1499 wns_enabled: false,
1500 fcm_credential_ref: None,
1501 apns_credential_ref: None,
1502 webpush_credential_ref: None,
1503 hms_credential_ref: None,
1504 wns_credential_ref: None,
1505 accept_worker_count: 1,
1506 planner_worker_count: 1,
1507 shard_worker_count: 1,
1508 dispatch_worker_count: 1,
1509 feedback_worker_count: 1,
1510 queue_partition_count: 1,
1511 channel_shard_count: 1,
1512 fanout_fast_threshold: 10_000,
1513 fanout_shard_size: 100_000,
1514 fanout_sync_threshold: 0,
1515 backpressure_lag_threshold_secs: 60,
1516 publish_status_ttl_days: 30,
1517 stale_device_max_age_days: 90,
1518 retry: PushRetryConfig::default(),
1519 circuit_breaker: PushCircuitBreakerConfig::default(),
1520 default_quotas: PushDefaultQuotas::default(),
1521 credential_encryption_key: None,
1522 kms_key_ref: None,
1523 vault_secret_ref: None,
1524 dry_run: false,
1525 analytics_enabled: false,
1526 analytics_retention_days: 30,
1527 payload_redaction: PushPayloadRedactionConfig::default(),
1528 scheduler_interval_secs: 5,
1529 }
1530 }
1531}
1532
1533#[derive(Debug, Clone, Serialize, Deserialize)]
1534#[serde(default)]
1535pub struct PushRetryConfig {
1536 pub max_attempts: u32,
1537 pub initial_backoff_ms: u64,
1538 pub max_backoff_ms: u64,
1539 pub max_elapsed_secs: u64,
1540 pub jitter: bool,
1541 pub respect_retry_after: bool,
1542}
1543
1544impl Default for PushRetryConfig {
1545 fn default() -> Self {
1546 Self {
1547 max_attempts: 5,
1548 initial_backoff_ms: 1_000,
1549 max_backoff_ms: 60_000,
1550 max_elapsed_secs: 86_400,
1551 jitter: true,
1552 respect_retry_after: true,
1553 }
1554 }
1555}
1556
1557#[derive(Debug, Clone, Serialize, Deserialize)]
1558#[serde(default)]
1559pub struct PushCircuitBreakerConfig {
1560 pub failure_threshold: u32,
1561 pub cooldown_secs: u64,
1562 pub half_open_max_inflight: u32,
1563}
1564
1565impl Default for PushCircuitBreakerConfig {
1566 fn default() -> Self {
1567 Self {
1568 failure_threshold: 5,
1569 cooldown_secs: 60,
1570 half_open_max_inflight: 10,
1571 }
1572 }
1573}
1574
1575#[derive(Debug, Clone, Serialize, Deserialize)]
1576#[serde(default)]
1577pub struct PushDefaultQuotas {
1578 pub acceptance_rps: u64,
1579 pub delivery_quota_daily: u64,
1580 pub fanout_max: u64,
1581 pub inflight_max: u64,
1582}
1583
1584impl Default for PushDefaultQuotas {
1585 fn default() -> Self {
1586 Self {
1587 acceptance_rps: 100,
1588 delivery_quota_daily: 0,
1589 fanout_max: 0,
1590 inflight_max: 1_000,
1591 }
1592 }
1593}
1594
1595#[derive(Debug, Clone, Serialize, Deserialize)]
1596#[serde(default)]
1597pub struct PushPayloadRedactionConfig {
1598 pub redact_payload: bool,
1599 pub redact_template_data: bool,
1600 pub redact_provider_overrides: bool,
1601 pub allow_debug_payload_logging: bool,
1602}
1603
1604impl Default for PushPayloadRedactionConfig {
1605 fn default() -> Self {
1606 Self {
1607 redact_payload: true,
1608 redact_template_data: true,
1609 redact_provider_overrides: true,
1610 allow_debug_payload_logging: false,
1611 }
1612 }
1613}
1614
1615#[derive(Debug, Clone, Serialize, Deserialize)]
1616#[serde(default)]
1617pub struct PresenceHistoryConfig {
1618 pub enabled: bool,
1619 pub retention_window_seconds: u64,
1620 pub max_page_size: usize,
1621 pub max_events_per_channel: Option<usize>,
1622 pub max_bytes_per_channel: Option<u64>,
1623}
1624
1625impl Default for PresenceHistoryConfig {
1626 fn default() -> Self {
1627 Self {
1628 enabled: false,
1629 retention_window_seconds: 86400,
1630 max_page_size: 100,
1631 max_events_per_channel: None,
1632 max_bytes_per_channel: None,
1633 }
1634 }
1635}
1636
1637#[derive(Debug, Clone, Serialize, Deserialize)]
1638#[serde(default)]
1639pub struct HttpApiConfig {
1640 pub request_limit_in_mb: u32,
1641 pub accept_traffic: AcceptTraffic,
1642 pub usage_enabled: bool,
1643}
1644
1645#[derive(Debug, Clone, Serialize, Deserialize)]
1646#[serde(default)]
1647pub struct AcceptTraffic {
1648 pub memory_threshold: f64,
1649}
1650
1651#[derive(Debug, Clone, Serialize, Deserialize)]
1652#[serde(default)]
1653pub struct InstanceConfig {
1654 pub process_id: String,
1655}
1656
1657#[derive(Debug, Clone, Serialize, Deserialize)]
1658#[serde(default)]
1659pub struct MetricsConfig {
1660 pub enabled: bool,
1661 pub driver: MetricsDriver,
1662 pub host: String,
1663 pub prometheus: PrometheusConfig,
1664 pub tcp_exporter: MetricsTcpExporterConfig,
1665 pub port: u16,
1666}
1667
1668#[derive(Debug, Clone, Serialize, Deserialize)]
1669#[serde(default)]
1670pub struct PrometheusConfig {
1671 pub prefix: String,
1672}
1673
1674#[derive(Debug, Clone, Serialize, Deserialize)]
1675#[serde(default)]
1676pub struct MetricsTcpExporterConfig {
1677 pub enabled: bool,
1678 pub host: String,
1679 pub port: u16,
1680 pub buffer_size: Option<usize>,
1681}
1682
1683#[derive(Debug, Clone, Serialize, Deserialize)]
1684#[serde(default)]
1685pub struct LoggingConfig {
1686 pub colors_enabled: bool,
1687 pub include_target: bool,
1688}
1689
1690#[derive(Debug, Clone, Serialize, Deserialize)]
1691#[serde(default)]
1692pub struct PresenceConfig {
1693 pub max_members_per_channel: u32,
1694 pub max_member_size_in_kb: u32,
1695}
1696
1697#[derive(Debug, Clone, Serialize, Deserialize)]
1700#[serde(default)]
1701pub struct WebSocketConfig {
1702 pub max_messages: Option<usize>,
1703 pub max_bytes: Option<usize>,
1704 pub disconnect_on_buffer_full: bool,
1705 pub max_message_size: usize,
1706 pub max_frame_size: usize,
1707 pub write_buffer_size: usize,
1708 pub max_backpressure: usize,
1709 pub auto_ping: bool,
1710 pub ping_interval: u32,
1711 pub idle_timeout: u32,
1712 pub compression: String,
1713}
1714
1715impl Default for WebSocketConfig {
1716 fn default() -> Self {
1717 Self {
1718 max_messages: Some(1000),
1719 max_bytes: None,
1720 disconnect_on_buffer_full: true,
1721 max_message_size: 64 * 1024 * 1024,
1722 max_frame_size: 16 * 1024 * 1024,
1723 write_buffer_size: 16 * 1024,
1724 max_backpressure: 1024 * 1024,
1725 auto_ping: true,
1726 ping_interval: 30,
1727 idle_timeout: 120,
1728 compression: "disabled".to_string(),
1729 }
1730 }
1731}
1732
1733impl WebSocketConfig {
1734 pub fn to_buffer_config(&self) -> crate::websocket::WebSocketBufferConfig {
1736 use crate::websocket::{BufferLimit, WebSocketBufferConfig};
1737
1738 let limit = match (self.max_messages, self.max_bytes) {
1739 (Some(messages), Some(bytes)) => BufferLimit::Both { messages, bytes },
1740 (Some(messages), None) => BufferLimit::Messages(messages),
1741 (None, Some(bytes)) => BufferLimit::Bytes(bytes),
1742 (None, None) => BufferLimit::Messages(1000),
1743 };
1744
1745 WebSocketBufferConfig {
1746 limit,
1747 disconnect_on_full: self.disconnect_on_buffer_full,
1748 }
1749 }
1750
1751 pub fn to_sockudo_ws_config(
1753 &self,
1754 websocket_max_payload_kb: u32,
1755 activity_timeout: u64,
1756 ) -> sockudo_ws::Config {
1757 use sockudo_ws::Compression;
1758
1759 let compression = match self.compression.to_lowercase().as_str() {
1760 "dedicated" => Compression::Dedicated,
1761 "shared" => Compression::Shared,
1762 "window256b" => Compression::Window256B,
1763 "window1kb" => Compression::Window1KB,
1764 "window2kb" => Compression::Window2KB,
1765 "window4kb" => Compression::Window4KB,
1766 "window8kb" => Compression::Window8KB,
1767 "window16kb" => Compression::Window16KB,
1768 "window32kb" => Compression::Window32KB,
1769 _ => Compression::Disabled,
1770 };
1771
1772 sockudo_ws::Config::builder()
1773 .max_payload_length(
1774 self.max_bytes
1775 .unwrap_or(websocket_max_payload_kb as usize * 1024),
1776 )
1777 .max_message_size(self.max_message_size)
1778 .max_frame_size(self.max_frame_size)
1779 .write_buffer_size(self.write_buffer_size)
1780 .max_backpressure(self.max_backpressure)
1781 .idle_timeout(self.idle_timeout)
1782 .auto_ping(self.auto_ping)
1783 .ping_interval(self.ping_interval.max((activity_timeout / 2).max(5) as u32))
1784 .compression(compression)
1785 .build()
1786 }
1787}
1788
1789#[derive(Debug, Clone, Serialize, Deserialize, Default)]
1790#[serde(default)]
1791pub struct QueueConfig {
1792 pub driver: QueueDriver,
1793 pub redis: RedisQueueConfig,
1794 pub redis_cluster: RedisClusterQueueConfig,
1795 pub nats: NatsAdapterConfig,
1796 pub pulsar: PulsarAdapterConfig,
1797 pub rabbitmq: RabbitMqAdapterConfig,
1798 pub google_pubsub: GooglePubSubAdapterConfig,
1799 pub kafka: KafkaAdapterConfig,
1800 pub iggy: IggyConfig,
1801 pub sqs: SqsQueueConfig,
1802 pub sns: SnsQueueConfig,
1803}
1804
1805#[derive(Debug, Clone, Serialize, Deserialize)]
1806#[serde(default)]
1807pub struct RedisQueueConfig {
1808 pub concurrency: u32,
1809 pub prefix: Option<String>,
1810 pub url_override: Option<String>,
1811 pub cluster_mode: bool,
1812}
1813
1814#[derive(Clone, Debug, Serialize, Deserialize)]
1815#[serde(default)]
1816pub struct RateLimit {
1817 pub max_requests: u32,
1818 pub window_seconds: u64,
1819 pub identifier: Option<String>,
1820 pub trust_hops: Option<u32>,
1821}
1822
1823#[derive(Debug, Clone, Serialize, Deserialize)]
1824#[serde(default)]
1825pub struct RateLimiterConfig {
1826 pub enabled: bool,
1827 pub driver: CacheDriver,
1828 pub api_rate_limit: RateLimit,
1829 pub websocket_rate_limit: RateLimit,
1830 pub redis: RedisConfig,
1831}
1832
1833#[derive(Debug, Clone, Serialize, Deserialize)]
1834#[serde(default)]
1835pub struct SslConfig {
1836 pub enabled: bool,
1837 pub cert_path: String,
1838 pub key_path: String,
1839 pub passphrase: Option<String>,
1840 pub ca_path: Option<String>,
1841 pub redirect_http: bool,
1842 pub http_port: Option<u16>,
1843}
1844
1845#[derive(Debug, Clone, Serialize, Deserialize)]
1846#[serde(default)]
1847pub struct WebhooksConfig {
1848 pub batching: BatchingConfig,
1849 pub retry: WebhookRetryConfig,
1850 pub request_timeout_ms: u64,
1851}
1852
1853#[derive(Debug, Clone, Serialize, Deserialize)]
1854#[serde(default)]
1855pub struct BatchingConfig {
1856 pub enabled: bool,
1857 pub duration: u64,
1858 pub size: usize,
1859}
1860
1861#[derive(Debug, Clone, Serialize, Deserialize)]
1862#[serde(default)]
1863pub struct WebhookRetryConfig {
1864 pub enabled: bool,
1865 pub max_attempts: Option<u32>,
1866 pub max_elapsed_time_ms: u64,
1867 pub initial_backoff_ms: u64,
1868 pub max_backoff_ms: u64,
1869}
1870
1871impl Default for WebhooksConfig {
1872 fn default() -> Self {
1873 Self {
1874 batching: BatchingConfig::default(),
1875 retry: WebhookRetryConfig::default(),
1876 request_timeout_ms: 10_000,
1877 }
1878 }
1879}
1880
1881impl Default for WebhookRetryConfig {
1882 fn default() -> Self {
1883 Self {
1884 enabled: true,
1885 max_attempts: None,
1886 max_elapsed_time_ms: 300_000,
1887 initial_backoff_ms: 1_000,
1888 max_backoff_ms: 60_000,
1889 }
1890 }
1891}
1892
1893#[derive(Debug, Clone, Serialize, Deserialize)]
1894#[serde(default)]
1895pub struct ClusterHealthConfig {
1896 pub enabled: bool,
1897 pub heartbeat_interval_ms: u64,
1898 pub node_timeout_ms: u64,
1899 pub cleanup_interval_ms: u64,
1900}
1901
1902#[derive(Debug, Clone, Serialize, Deserialize)]
1903#[serde(default)]
1904pub struct UnixSocketConfig {
1905 pub enabled: bool,
1906 pub path: String,
1907 #[serde(deserialize_with = "deserialize_octal_permission")]
1908 pub permission_mode: u32,
1909}
1910
1911#[derive(Debug, Clone, Serialize, Deserialize)]
1912#[serde(default)]
1913pub struct DeltaCompressionOptionsConfig {
1914 pub enabled: bool,
1915 pub algorithm: String,
1916 pub full_message_interval: u32,
1917 pub min_message_size: usize,
1918 pub max_state_age_secs: u64,
1919 pub max_channel_states_per_socket: usize,
1920 pub max_conflation_states_per_channel: Option<usize>,
1921 pub conflation_key_path: Option<String>,
1922 pub cluster_coordination: bool,
1923 pub coordination_backend: DeltaCoordinationBackend,
1924 pub omit_delta_algorithm: bool,
1925}
1926
1927#[derive(Debug, Clone, Serialize, Deserialize)]
1929#[serde(default)]
1930pub struct CleanupConfig {
1931 pub queue_buffer_size: usize,
1932 pub batch_size: usize,
1933 pub batch_timeout_ms: u64,
1934 pub worker_threads: WorkerThreadsConfig,
1935 pub max_retry_attempts: u32,
1936 pub async_enabled: bool,
1937 pub fallback_to_sync: bool,
1938}
1939
1940#[derive(Debug, Clone)]
1942pub enum WorkerThreadsConfig {
1943 Auto,
1944 Fixed(usize),
1945}
1946
1947impl serde::Serialize for WorkerThreadsConfig {
1948 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
1949 where
1950 S: serde::Serializer,
1951 {
1952 match self {
1953 WorkerThreadsConfig::Auto => serializer.serialize_str("auto"),
1954 WorkerThreadsConfig::Fixed(n) => serializer.serialize_u64(*n as u64),
1955 }
1956 }
1957}
1958
1959impl<'de> serde::Deserialize<'de> for WorkerThreadsConfig {
1960 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
1961 where
1962 D: serde::Deserializer<'de>,
1963 {
1964 use serde::de;
1965 struct WorkerThreadsVisitor;
1966 impl<'de> de::Visitor<'de> for WorkerThreadsVisitor {
1967 type Value = WorkerThreadsConfig;
1968
1969 fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
1970 formatter.write_str(r#""auto" or a positive integer"#)
1971 }
1972
1973 fn visit_str<E: de::Error>(self, value: &str) -> Result<Self::Value, E> {
1974 if value.eq_ignore_ascii_case("auto") {
1975 Ok(WorkerThreadsConfig::Auto)
1976 } else if let Ok(n) = value.parse::<usize>() {
1977 Ok(WorkerThreadsConfig::Fixed(n))
1978 } else {
1979 Err(E::custom(format!(
1980 "expected 'auto' or a number, got '{value}'"
1981 )))
1982 }
1983 }
1984
1985 fn visit_u64<E: de::Error>(self, value: u64) -> Result<Self::Value, E> {
1986 Ok(WorkerThreadsConfig::Fixed(value as usize))
1987 }
1988
1989 fn visit_i64<E: de::Error>(self, value: i64) -> Result<Self::Value, E> {
1990 if value >= 0 {
1991 Ok(WorkerThreadsConfig::Fixed(value as usize))
1992 } else {
1993 Err(E::custom("worker_threads must be non-negative"))
1994 }
1995 }
1996 }
1997 deserializer.deserialize_any(WorkerThreadsVisitor)
1998 }
1999}
2000
2001impl Default for CleanupConfig {
2002 fn default() -> Self {
2003 Self {
2004 queue_buffer_size: 1024,
2005 batch_size: 64,
2006 batch_timeout_ms: 100,
2007 worker_threads: WorkerThreadsConfig::Auto,
2008 max_retry_attempts: 3,
2009 async_enabled: true,
2010 fallback_to_sync: true,
2011 }
2012 }
2013}
2014
2015impl CleanupConfig {
2016 pub fn validate(&self) -> Result<(), String> {
2017 if self.queue_buffer_size == 0 {
2018 return Err("queue_buffer_size must be greater than 0".to_string());
2019 }
2020 if self.batch_size == 0 {
2021 return Err("batch_size must be greater than 0".to_string());
2022 }
2023 if self.batch_timeout_ms == 0 {
2024 return Err("batch_timeout_ms must be greater than 0".to_string());
2025 }
2026 if let WorkerThreadsConfig::Fixed(n) = self.worker_threads
2027 && n == 0
2028 {
2029 return Err("worker_threads must be greater than 0 when using fixed count".to_string());
2030 }
2031 Ok(())
2032 }
2033}
2034
2035#[derive(Debug, Clone, Serialize, Deserialize, Default)]
2036#[serde(default)]
2037pub struct TagFilteringConfig {
2038 #[serde(default)]
2039 pub enabled: bool,
2040 #[serde(default = "default_true")]
2041 pub enable_tags: bool,
2042}
2043
2044fn default_true() -> bool {
2045 true
2046}
2047
2048impl Default for ServerOptions {
2051 fn default() -> Self {
2052 Self {
2053 adapter: AdapterConfig::default(),
2054 app_manager: AppManagerConfig::default(),
2055 cache: CacheConfig::default(),
2056 channel_limits: ChannelLimits::default(),
2057 cors: CorsConfig::default(),
2058 database: DatabaseConfig::default(),
2059 database_pooling: DatabasePooling::default(),
2060 debug: false,
2061 tag_filtering: TagFilteringConfig::default(),
2062 event_limits: EventLimits::default(),
2063 host: "0.0.0.0".to_string(),
2064 http_api: HttpApiConfig::default(),
2065 instance: InstanceConfig::default(),
2066 logging: None,
2067 metrics: MetricsConfig::default(),
2068 mode: "production".to_string(),
2069 port: 6001,
2070 path_prefix: "/".to_string(),
2071 presence: PresenceConfig::default(),
2072 queue: QueueConfig::default(),
2073 rate_limiter: RateLimiterConfig::default(),
2074 shutdown_grace_period: 10,
2075 ssl: SslConfig::default(),
2076 user_authentication_timeout: 3600,
2077 webhooks: WebhooksConfig::default(),
2078 websocket_max_payload_kb: 64,
2079 cleanup: CleanupConfig::default(),
2080 activity_timeout: 120,
2081 cluster_health: ClusterHealthConfig::default(),
2082 unix_socket: UnixSocketConfig::default(),
2083 delta_compression: DeltaCompressionOptionsConfig::default(),
2084 websocket: WebSocketConfig::default(),
2085 connection_recovery: ConnectionRecoveryConfig::default(),
2086 history: HistoryConfig::default(),
2087 presence_history: PresenceHistoryConfig::default(),
2088 idempotency: IdempotencyConfig::default(),
2089 ephemeral: EphemeralConfig::default(),
2090 echo_control: EchoControlConfig::default(),
2091 event_name_filtering: EventNameFilteringConfig::default(),
2092 versioned_messages: VersionedMessagesConfig::default(),
2093 annotations: AnnotationsConfig::default(),
2094 push: PushConfig::default(),
2095 health_check_timeout_ms: 400,
2096 }
2097 }
2098}
2099
2100impl Default for SqsQueueConfig {
2101 fn default() -> Self {
2102 Self {
2103 region: "us-east-1".to_string(),
2104 queue_url_prefix: None,
2105 visibility_timeout: 30,
2106 endpoint_url: None,
2107 max_messages: 10,
2108 wait_time_seconds: 5,
2109 concurrency: 5,
2110 fifo: false,
2111 message_group_id: Some("default".to_string()),
2112 }
2113 }
2114}
2115
2116impl Default for SnsQueueConfig {
2117 fn default() -> Self {
2118 Self {
2119 region: "us-east-1".to_string(),
2120 topic_arn: String::new(),
2121 endpoint_url: None,
2122 }
2123 }
2124}
2125
2126impl Default for RedisAdapterConfig {
2127 fn default() -> Self {
2128 Self {
2129 requests_timeout: 5000,
2130 prefix: "sockudo_adapter:".to_string(),
2131 redis_pub_options: AHashMap::new(),
2132 redis_sub_options: AHashMap::new(),
2133 cluster_mode: false,
2134 }
2135 }
2136}
2137
2138impl Default for RedisClusterAdapterConfig {
2139 fn default() -> Self {
2140 Self {
2141 nodes: vec![],
2142 prefix: "sockudo_adapter:".to_string(),
2143 request_timeout_ms: 1000,
2144 use_connection_manager: true,
2145 use_sharded_pubsub: false,
2146 }
2147 }
2148}
2149
2150impl Default for NatsAdapterConfig {
2151 fn default() -> Self {
2152 Self {
2153 servers: vec!["nats://localhost:4222".to_string()],
2154 prefix: "sockudo_adapter:".to_string(),
2155 request_timeout_ms: 5000,
2156 username: None,
2157 password: None,
2158 token: None,
2159 connection_timeout_ms: 5000,
2160 nodes_number: None,
2161 discovery_max_wait_ms: 1000,
2162 discovery_idle_wait_ms: 150,
2163 subscription_capacity: None,
2164 client_capacity: None,
2165 max_reconnects: None,
2166 presence_sync_chunk_size: None,
2167 }
2168 }
2169}
2170
2171impl Default for PulsarAdapterConfig {
2172 fn default() -> Self {
2173 Self {
2174 url: "pulsar://127.0.0.1:6650".to_string(),
2175 prefix: "sockudo-adapter".to_string(),
2176 request_timeout_ms: 5000,
2177 token: None,
2178 nodes_number: None,
2179 }
2180 }
2181}
2182
2183impl Default for RabbitMqAdapterConfig {
2184 fn default() -> Self {
2185 Self {
2186 url: "amqp://guest:guest@127.0.0.1:5672/%2f".to_string(),
2187 prefix: "sockudo_adapter".to_string(),
2188 request_timeout_ms: 5000,
2189 connection_timeout_ms: 5000,
2190 nodes_number: None,
2191 }
2192 }
2193}
2194
2195impl Default for GooglePubSubAdapterConfig {
2196 fn default() -> Self {
2197 Self {
2198 project_id: "".to_string(),
2199 prefix: "sockudo-adapter".to_string(),
2200 request_timeout_ms: 5000,
2201 emulator_host: None,
2202 nodes_number: None,
2203 }
2204 }
2205}
2206
2207impl Default for KafkaAdapterConfig {
2208 fn default() -> Self {
2209 Self {
2210 brokers: vec!["localhost:9092".to_string()],
2211 prefix: "sockudo_adapter".to_string(),
2212 request_timeout_ms: 5000,
2213 security_protocol: None,
2214 sasl_mechanism: None,
2215 sasl_username: None,
2216 sasl_password: None,
2217 nodes_number: None,
2218 }
2219 }
2220}
2221
2222impl Default for IggyConfig {
2223 fn default() -> Self {
2224 Self {
2225 connection_string: "iggy://iggy:iggy@127.0.0.1:8090".to_string(),
2226 username: None,
2227 password: None,
2228 consumer_name: None,
2229 stream: "sockudo".to_string(),
2230 topic_prefix: "sockudo-adapter".to_string(),
2231 queue_topic_prefix: "sockudo-queue".to_string(),
2232 consumer_group_prefix: "sockudo-workers".to_string(),
2233 request_timeout_ms: 5000,
2234 poll_interval_ms: 50,
2235 poll_batch_size: 100,
2236 partitions_count: 1,
2237 partition_id: 0,
2238 auto_create: true,
2239 start_from_latest: true,
2240 nodes_number: None,
2241 }
2242 }
2243}
2244
2245impl Default for CacheSettings {
2246 fn default() -> Self {
2247 Self {
2248 enabled: true,
2249 ttl: 300,
2250 }
2251 }
2252}
2253
2254impl Default for MemoryCacheOptions {
2255 fn default() -> Self {
2256 Self {
2257 ttl: 300,
2258 cleanup_interval: 60,
2259 max_capacity: 10000,
2260 }
2261 }
2262}
2263
2264impl Default for CacheConfig {
2265 fn default() -> Self {
2266 Self {
2267 driver: CacheDriver::default(),
2268 redis: RedisConfig {
2269 prefix: Some("sockudo_cache:".to_string()),
2270 url_override: None,
2271 cluster_mode: false,
2272 },
2273 memory: MemoryCacheOptions::default(),
2274 }
2275 }
2276}
2277
2278impl Default for ChannelLimits {
2279 fn default() -> Self {
2280 Self {
2281 max_name_length: 200,
2282 cache_ttl: 3600,
2283 }
2284 }
2285}
2286impl Default for CorsConfig {
2287 fn default() -> Self {
2288 Self {
2289 credentials: true,
2290 origin: vec!["*".to_string()],
2291 methods: vec!["GET".to_string(), "POST".to_string(), "OPTIONS".to_string()],
2292 allowed_headers: vec![
2293 "Authorization".to_string(),
2294 "Content-Type".to_string(),
2295 "X-Requested-With".to_string(),
2296 "Accept".to_string(),
2297 ],
2298 }
2299 }
2300}
2301
2302impl Default for DatabaseConnection {
2303 fn default() -> Self {
2304 Self {
2305 host: "localhost".to_string(),
2306 port: 3306,
2307 username: "root".to_string(),
2308 password: "".to_string(),
2309 database: "sockudo".to_string(),
2310 table_name: "applications".to_string(),
2311 connection_pool_size: 10,
2312 pool_min: None,
2313 pool_max: None,
2314 cache_ttl: 300,
2315 cache_cleanup_interval: 60,
2316 cache_max_capacity: 100,
2317 }
2318 }
2319}
2320
2321impl Default for RedisConnection {
2322 fn default() -> Self {
2323 Self {
2324 host: "127.0.0.1".to_string(),
2325 port: 6379,
2326 db: 0,
2327 username: None,
2328 password: None,
2329 key_prefix: "sockudo:".to_string(),
2330 sentinels: Vec::new(),
2331 sentinel_password: None,
2332 name: "mymaster".to_string(),
2333 cluster: RedisClusterConnection::default(),
2334 cluster_nodes: Vec::new(),
2335 }
2336 }
2337}
2338
2339impl Default for RedisSentinel {
2340 fn default() -> Self {
2341 Self {
2342 host: "localhost".to_string(),
2343 port: 26379,
2344 }
2345 }
2346}
2347
2348impl Default for ClusterNode {
2349 fn default() -> Self {
2350 Self {
2351 host: "127.0.0.1".to_string(),
2352 port: 7000,
2353 }
2354 }
2355}
2356
2357impl Default for DatabasePooling {
2358 fn default() -> Self {
2359 Self {
2360 enabled: true,
2361 min: 2,
2362 max: 10,
2363 }
2364 }
2365}
2366
2367impl Default for EventLimits {
2368 fn default() -> Self {
2369 Self {
2370 max_channels_at_once: 100,
2371 max_name_length: 200,
2372 max_payload_in_kb: 100,
2373 max_batch_size: 10,
2374 }
2375 }
2376}
2377
2378impl Default for IdempotencyConfig {
2379 fn default() -> Self {
2380 Self {
2381 enabled: true,
2382 ttl_seconds: 120,
2383 max_key_length: 128,
2384 }
2385 }
2386}
2387
2388impl Default for EphemeralConfig {
2389 fn default() -> Self {
2390 Self { enabled: true }
2391 }
2392}
2393
2394impl Default for EchoControlConfig {
2395 fn default() -> Self {
2396 Self {
2397 enabled: true,
2398 default_echo_messages: true,
2399 }
2400 }
2401}
2402
2403impl Default for EventNameFilteringConfig {
2404 fn default() -> Self {
2405 Self {
2406 enabled: true,
2407 max_events_per_filter: 50,
2408 max_event_name_length: 200,
2409 }
2410 }
2411}
2412
2413impl Default for ConnectionRecoveryConfig {
2414 fn default() -> Self {
2415 Self {
2416 enabled: false,
2417 buffer_ttl_seconds: 120,
2418 max_buffer_size: 100,
2419 }
2420 }
2421}
2422
2423impl Default for HttpApiConfig {
2424 fn default() -> Self {
2425 Self {
2426 request_limit_in_mb: 10,
2427 accept_traffic: AcceptTraffic::default(),
2428 usage_enabled: true,
2429 }
2430 }
2431}
2432
2433impl Default for AcceptTraffic {
2434 fn default() -> Self {
2435 Self {
2436 memory_threshold: 0.90,
2437 }
2438 }
2439}
2440
2441impl Default for InstanceConfig {
2442 fn default() -> Self {
2443 Self {
2444 process_id: uuid::Uuid::new_v4().to_string(),
2445 }
2446 }
2447}
2448
2449impl Default for LoggingConfig {
2450 fn default() -> Self {
2451 Self {
2452 colors_enabled: true,
2453 include_target: true,
2454 }
2455 }
2456}
2457
2458impl Default for MetricsConfig {
2459 fn default() -> Self {
2460 Self {
2461 enabled: true,
2462 driver: MetricsDriver::default(),
2463 host: "0.0.0.0".to_string(),
2464 prometheus: PrometheusConfig::default(),
2465 tcp_exporter: MetricsTcpExporterConfig::default(),
2466 port: 9601,
2467 }
2468 }
2469}
2470
2471impl Default for PrometheusConfig {
2472 fn default() -> Self {
2473 Self {
2474 prefix: "sockudo_".to_string(),
2475 }
2476 }
2477}
2478
2479impl Default for MetricsTcpExporterConfig {
2480 fn default() -> Self {
2481 Self {
2482 enabled: false,
2483 host: "127.0.0.1".to_string(),
2484 port: 5000,
2485 buffer_size: Some(1024),
2486 }
2487 }
2488}
2489
2490impl Default for PresenceConfig {
2491 fn default() -> Self {
2492 Self {
2493 max_members_per_channel: 100,
2494 max_member_size_in_kb: 2,
2495 }
2496 }
2497}
2498
2499impl Default for RedisQueueConfig {
2500 fn default() -> Self {
2501 Self {
2502 concurrency: 5,
2503 prefix: Some("sockudo_queue:".to_string()),
2504 url_override: None,
2505 cluster_mode: false,
2506 }
2507 }
2508}
2509
2510impl Default for RateLimit {
2511 fn default() -> Self {
2512 Self {
2513 max_requests: 60,
2514 window_seconds: 60,
2515 identifier: Some("default".to_string()),
2516 trust_hops: Some(0),
2517 }
2518 }
2519}
2520
2521impl Default for RateLimiterConfig {
2522 fn default() -> Self {
2523 Self {
2524 enabled: true,
2525 driver: CacheDriver::Memory,
2526 api_rate_limit: RateLimit {
2527 max_requests: 100,
2528 window_seconds: 60,
2529 identifier: Some("api".to_string()),
2530 trust_hops: Some(0),
2531 },
2532 websocket_rate_limit: RateLimit {
2533 max_requests: 20,
2534 window_seconds: 60,
2535 identifier: Some("websocket_connect".to_string()),
2536 trust_hops: Some(0),
2537 },
2538 redis: RedisConfig {
2539 prefix: Some("sockudo_rl:".to_string()),
2540 url_override: None,
2541 cluster_mode: false,
2542 },
2543 }
2544 }
2545}
2546
2547impl Default for SslConfig {
2548 fn default() -> Self {
2549 Self {
2550 enabled: false,
2551 cert_path: "".to_string(),
2552 key_path: "".to_string(),
2553 passphrase: None,
2554 ca_path: None,
2555 redirect_http: false,
2556 http_port: Some(80),
2557 }
2558 }
2559}
2560
2561impl Default for BatchingConfig {
2562 fn default() -> Self {
2563 Self {
2564 enabled: true,
2565 duration: 50,
2566 size: 100,
2567 }
2568 }
2569}
2570
2571impl Default for ClusterHealthConfig {
2572 fn default() -> Self {
2573 Self {
2574 enabled: true,
2575 heartbeat_interval_ms: 10000,
2576 node_timeout_ms: 30000,
2577 cleanup_interval_ms: 10000,
2578 }
2579 }
2580}
2581
2582impl Default for UnixSocketConfig {
2583 fn default() -> Self {
2584 Self {
2585 enabled: false,
2586 path: "/var/run/sockudo/sockudo.sock".to_string(),
2587 permission_mode: 0o660,
2588 }
2589 }
2590}
2591
2592impl Default for DeltaCompressionOptionsConfig {
2593 fn default() -> Self {
2594 Self {
2595 enabled: true,
2596 algorithm: "fossil".to_string(),
2597 full_message_interval: 10,
2598 min_message_size: 100,
2599 max_state_age_secs: 300,
2600 max_channel_states_per_socket: 100,
2601 max_conflation_states_per_channel: Some(100),
2602 conflation_key_path: None,
2603 cluster_coordination: false,
2604 coordination_backend: DeltaCoordinationBackend::Auto,
2605 omit_delta_algorithm: false,
2606 }
2607 }
2608}
2609
2610#[cfg(test)]
2611mod tests {
2612 use super::{
2613 DeltaCoordinationBackend, PushQueueDriver, PushStorageDriver, QueueDriver, ServerOptions,
2614 VersionStoreDriver,
2615 };
2616 use crate::app::{App, AppPolicy};
2617 use std::str::FromStr;
2618
2619 const APP_BOOTSTRAP_ENV_KEYS: &[&str] = &[
2620 "SOCKUDO_DEFAULT_APP_ID",
2621 "SOCKUDO_DEFAULT_APP_KEY",
2622 "SOCKUDO_DEFAULT_APP_SECRET",
2623 "SOCKUDO_DEFAULT_APP_ENABLED",
2624 "SOCKUDO_SKIP_INLINE_APPS",
2625 "APP_MANAGER_REGISTER_INLINE_APPS",
2626 ];
2627
2628 struct EnvGuard {
2629 previous: Vec<(&'static str, Option<String>)>,
2630 }
2631
2632 impl EnvGuard {
2633 fn app_bootstrap(overrides: &[(&'static str, &'static str)]) -> Self {
2634 let previous = APP_BOOTSTRAP_ENV_KEYS
2635 .iter()
2636 .map(|key| (*key, std::env::var(key).ok()))
2637 .collect();
2638
2639 unsafe {
2642 for key in APP_BOOTSTRAP_ENV_KEYS {
2643 std::env::remove_var(key);
2644 }
2645 for (key, value) in overrides {
2646 std::env::set_var(key, value);
2647 }
2648 }
2649
2650 Self { previous }
2651 }
2652 }
2653
2654 impl Drop for EnvGuard {
2655 fn drop(&mut self) {
2656 unsafe {
2659 for (key, value) in &self.previous {
2660 if let Some(value) = value {
2661 std::env::set_var(key, value);
2662 } else {
2663 std::env::remove_var(key);
2664 }
2665 }
2666 }
2667 }
2668 }
2669
2670 fn inline_test_app() -> App {
2671 App::from_policy(
2672 "app-id".to_string(),
2673 "app-key".to_string(),
2674 "app-secret".to_string(),
2675 true,
2676 AppPolicy::default(),
2677 )
2678 }
2679
2680 #[test]
2681 fn queue_driver_parses_broker_backends() {
2682 assert_eq!(
2683 QueueDriver::from_str("rabbitmq").unwrap(),
2684 QueueDriver::RabbitMq
2685 );
2686 assert_eq!(QueueDriver::from_str("kafka").unwrap(), QueueDriver::Kafka);
2687 assert_eq!(
2688 QueueDriver::from_str("pulsar").unwrap(),
2689 QueueDriver::Pulsar
2690 );
2691 assert_eq!(
2692 QueueDriver::from_str("google-pubsub").unwrap(),
2693 QueueDriver::GooglePubSub
2694 );
2695 }
2696
2697 #[test]
2698 fn delta_coordination_backend_parses_expected_values() {
2699 assert_eq!(
2700 DeltaCoordinationBackend::from_str("auto").unwrap(),
2701 DeltaCoordinationBackend::Auto
2702 );
2703 assert_eq!(
2704 DeltaCoordinationBackend::from_str("redis-cluster").unwrap(),
2705 DeltaCoordinationBackend::RedisCluster
2706 );
2707 assert_eq!(
2708 DeltaCoordinationBackend::from_str("nats").unwrap(),
2709 DeltaCoordinationBackend::Nats
2710 );
2711 }
2712
2713 #[tokio::test]
2714 async fn app_bootstrap_env_overrides_inline_apps() {
2715 {
2716 let _env = EnvGuard::app_bootstrap(&[("SOCKUDO_DEFAULT_APP_ENABLED", "false")]);
2717 let mut options = ServerOptions::default();
2718 options.app_manager.array.apps.push(inline_test_app());
2719
2720 options.override_from_env().await.unwrap();
2721
2722 assert!(options.app_manager.array.apps.is_empty());
2723 }
2724
2725 {
2726 let _env = EnvGuard::app_bootstrap(&[("SOCKUDO_DEFAULT_APP_ENABLED", "true")]);
2727 let mut options = ServerOptions::default();
2728 options.app_manager.array.apps.push(inline_test_app());
2729
2730 options.override_from_env().await.unwrap();
2731
2732 assert_eq!(options.app_manager.array.apps.len(), 1);
2733 assert_eq!(options.app_manager.array.apps[0].id, "app-id");
2734 }
2735
2736 {
2737 let _env = EnvGuard::app_bootstrap(&[
2738 ("SOCKUDO_DEFAULT_APP_ID", "prod-app"),
2739 ("SOCKUDO_DEFAULT_APP_KEY", "prod-key"),
2740 ("SOCKUDO_DEFAULT_APP_SECRET", "prod-secret"),
2741 ("SOCKUDO_DEFAULT_APP_ENABLED", "true"),
2742 ]);
2743 let mut options = ServerOptions::default();
2744 options.app_manager.array.apps.push(inline_test_app());
2745
2746 options.override_from_env().await.unwrap();
2747
2748 assert_eq!(options.app_manager.array.apps.len(), 1);
2749 let app = &options.app_manager.array.apps[0];
2750 assert_eq!(app.id, "prod-app");
2751 assert_eq!(app.key, "prod-key");
2752 assert_eq!(app.secret, "prod-secret");
2753 assert!(app.enabled);
2754 }
2755
2756 {
2757 let _env = EnvGuard::app_bootstrap(&[("APP_MANAGER_REGISTER_INLINE_APPS", "false")]);
2758 let mut options = ServerOptions::default();
2759 options.app_manager.array.apps.push(inline_test_app());
2760
2761 options.override_from_env().await.unwrap();
2762
2763 assert!(options.app_manager.array.apps.is_empty());
2764 }
2765 }
2766
2767 #[tokio::test]
2768 async fn versioned_messages_driver_overrides_from_env() {
2769 let previous = std::env::var("VERSIONED_MESSAGES_DRIVER").ok();
2770 unsafe { std::env::set_var("VERSIONED_MESSAGES_DRIVER", "postgres") };
2773
2774 let mut options = ServerOptions::default();
2775 options.override_from_env().await.unwrap();
2776
2777 if let Some(previous) = previous {
2778 unsafe { std::env::set_var("VERSIONED_MESSAGES_DRIVER", previous) };
2780 } else {
2781 unsafe { std::env::remove_var("VERSIONED_MESSAGES_DRIVER") };
2783 }
2784
2785 assert_eq!(
2786 options.versioned_messages.driver,
2787 VersionStoreDriver::Postgres
2788 );
2789 }
2790
2791 #[tokio::test]
2792 async fn push_storage_driver_overrides_from_env() {
2793 let previous = std::env::var("PUSH_STORAGE_DRIVER").ok();
2794 unsafe { std::env::set_var("PUSH_STORAGE_DRIVER", "mysql") };
2797
2798 let mut options = ServerOptions::default();
2799 options.override_from_env().await.unwrap();
2800
2801 if let Some(previous) = previous {
2802 unsafe { std::env::set_var("PUSH_STORAGE_DRIVER", previous) };
2804 } else {
2805 unsafe { std::env::remove_var("PUSH_STORAGE_DRIVER") };
2807 }
2808
2809 assert_eq!(options.push.storage_driver, PushStorageDriver::Mysql);
2810 }
2811
2812 #[tokio::test]
2813 async fn push_queue_driver_overrides_from_env() {
2814 let previous = std::env::var("PUSH_QUEUE_DRIVER").ok();
2815 unsafe { std::env::set_var("PUSH_QUEUE_DRIVER", "redis-cluster") };
2818
2819 let mut options = ServerOptions::default();
2820 options.override_from_env().await.unwrap();
2821
2822 if let Some(previous) = previous {
2823 unsafe { std::env::set_var("PUSH_QUEUE_DRIVER", previous) };
2825 } else {
2826 unsafe { std::env::remove_var("PUSH_QUEUE_DRIVER") };
2828 }
2829
2830 assert_eq!(options.push.queue_driver, PushQueueDriver::RedisCluster);
2831 }
2832
2833 #[tokio::test]
2834 async fn websocket_rate_limit_trust_hops_overrides_from_env() {
2835 let previous = std::env::var("RATE_LIMITER_WS_TRUST_HOPS").ok();
2836 unsafe { std::env::set_var("RATE_LIMITER_WS_TRUST_HOPS", "2") };
2839
2840 let mut options = ServerOptions::default();
2841 options.override_from_env().await.unwrap();
2842
2843 if let Some(previous) = previous {
2844 unsafe { std::env::set_var("RATE_LIMITER_WS_TRUST_HOPS", previous) };
2846 } else {
2847 unsafe { std::env::remove_var("RATE_LIMITER_WS_TRUST_HOPS") };
2849 }
2850
2851 assert_eq!(
2852 options.rate_limiter.websocket_rate_limit.trust_hops,
2853 Some(2)
2854 );
2855 }
2856
2857 #[test]
2858 fn push_config_defaults_follow_capacity_model() {
2859 let options = ServerOptions::default();
2860
2861 assert_eq!(options.push.storage_driver, PushStorageDriver::Memory);
2862 assert_eq!(options.push.queue_driver, PushQueueDriver::Memory);
2863 assert!(!options.push.fcm_enabled);
2864 assert!(!options.push.apns_enabled);
2865 assert!(!options.push.webpush_enabled);
2866 assert!(!options.push.hms_enabled);
2867 assert!(!options.push.wns_enabled);
2868 assert_eq!(options.push.fanout_fast_threshold, 10_000);
2869 assert_eq!(options.push.fanout_shard_size, 100_000);
2870 assert_eq!(options.push.publish_status_ttl_days, 30);
2871 assert_eq!(options.push.default_quotas.acceptance_rps, 100);
2872 assert_eq!(options.push.circuit_breaker.failure_threshold, 5);
2873 assert!(options.push.payload_redaction.redact_payload);
2874 }
2875
2876 #[tokio::test]
2877 async fn push_release_env_overrides_are_parsed() {
2878 let keys = [
2879 "PUSH_FCM_ENABLED",
2880 "PUSH_APNS_ENABLED",
2881 "PUSH_WEBPUSH_ENABLED",
2882 "PUSH_HMS_ENABLED",
2883 "PUSH_WNS_ENABLED",
2884 "PUSH_CREDENTIAL_ENCRYPTION_KEY",
2885 "PUSH_FANOUT_FAST_THRESHOLD",
2886 "PUSH_FANOUT_SHARD_SIZE",
2887 "PUSH_FANOUT_SYNC_THRESHOLD",
2888 "PUSH_BACKPRESSURE_LAG_THRESHOLD_SECS",
2889 "PUSH_PUBLISH_STATUS_TTL_DAYS",
2890 "PUSH_FAILURE_THRESHOLD",
2891 "PUSH_SCHEDULER_INTERVAL_SECS",
2892 "PUSH_STALE_DEVICE_MAX_AGE_DAYS",
2893 "PUSH_ANALYTICS_ENABLED",
2894 "PUSH_DEFAULT_ACCEPTANCE_RPS",
2895 "PUSH_DEFAULT_DELIVERY_QUOTA_DAILY",
2896 "PUSH_DEFAULT_FANOUT_MAX",
2897 "PUSH_DEFAULT_INFLIGHT_MAX",
2898 ];
2899 let previous: Vec<_> = keys
2900 .iter()
2901 .map(|key| (*key, std::env::var(key).ok()))
2902 .collect();
2903
2904 unsafe {
2907 std::env::set_var("PUSH_FCM_ENABLED", "true");
2908 std::env::set_var("PUSH_APNS_ENABLED", "true");
2909 std::env::set_var("PUSH_WEBPUSH_ENABLED", "true");
2910 std::env::set_var("PUSH_HMS_ENABLED", "true");
2911 std::env::set_var("PUSH_WNS_ENABLED", "true");
2912 std::env::set_var("PUSH_CREDENTIAL_ENCRYPTION_KEY", "env:key:v1");
2913 std::env::set_var("PUSH_FANOUT_FAST_THRESHOLD", "12345");
2914 std::env::set_var("PUSH_FANOUT_SHARD_SIZE", "54321");
2915 std::env::set_var("PUSH_FANOUT_SYNC_THRESHOLD", "250");
2916 std::env::set_var("PUSH_BACKPRESSURE_LAG_THRESHOLD_SECS", "42");
2917 std::env::set_var("PUSH_PUBLISH_STATUS_TTL_DAYS", "14");
2918 std::env::set_var("PUSH_FAILURE_THRESHOLD", "9");
2919 std::env::set_var("PUSH_SCHEDULER_INTERVAL_SECS", "11");
2920 std::env::set_var("PUSH_STALE_DEVICE_MAX_AGE_DAYS", "120");
2921 std::env::set_var("PUSH_ANALYTICS_ENABLED", "true");
2922 std::env::set_var("PUSH_DEFAULT_ACCEPTANCE_RPS", "700");
2923 std::env::set_var("PUSH_DEFAULT_DELIVERY_QUOTA_DAILY", "8000");
2924 std::env::set_var("PUSH_DEFAULT_FANOUT_MAX", "9000");
2925 std::env::set_var("PUSH_DEFAULT_INFLIGHT_MAX", "1000");
2926 }
2927
2928 let mut options = ServerOptions::default();
2929 options.override_from_env().await.unwrap();
2930
2931 for (key, value) in previous {
2932 unsafe {
2935 if let Some(value) = value {
2936 std::env::set_var(key, value);
2937 } else {
2938 std::env::remove_var(key);
2939 }
2940 }
2941 }
2942
2943 assert!(options.push.fcm_enabled);
2944 assert!(options.push.apns_enabled);
2945 assert!(options.push.webpush_enabled);
2946 assert!(options.push.hms_enabled);
2947 assert!(options.push.wns_enabled);
2948 assert_eq!(
2949 options.push.credential_encryption_key.as_deref(),
2950 Some("env:key:v1")
2951 );
2952 assert_eq!(options.push.fanout_fast_threshold, 12_345);
2953 assert_eq!(options.push.fanout_shard_size, 54_321);
2954 assert_eq!(options.push.fanout_sync_threshold, 250);
2955 assert_eq!(options.push.backpressure_lag_threshold_secs, 42);
2956 assert_eq!(options.push.publish_status_ttl_days, 14);
2957 assert_eq!(options.push.circuit_breaker.failure_threshold, 9);
2958 assert_eq!(options.push.scheduler_interval_secs, 11);
2959 assert_eq!(options.push.stale_device_max_age_days, 120);
2960 assert!(options.push.analytics_enabled);
2961 assert_eq!(options.push.default_quotas.acceptance_rps, 700);
2962 assert_eq!(options.push.default_quotas.delivery_quota_daily, 8_000);
2963 assert_eq!(options.push.default_quotas.fanout_max, 9_000);
2964 assert_eq!(options.push.default_quotas.inflight_max, 1_000);
2965 }
2966}
2967
2968impl ClusterHealthConfig {
2969 pub fn validate(&self) -> Result<(), String> {
2970 if self.heartbeat_interval_ms == 0 {
2971 return Err("heartbeat_interval_ms must be greater than 0".to_string());
2972 }
2973 if self.node_timeout_ms == 0 {
2974 return Err("node_timeout_ms must be greater than 0".to_string());
2975 }
2976 if self.cleanup_interval_ms == 0 {
2977 return Err("cleanup_interval_ms must be greater than 0".to_string());
2978 }
2979
2980 if self.heartbeat_interval_ms > self.node_timeout_ms / 3 {
2981 return Err(format!(
2982 "heartbeat_interval_ms ({}) should be at least 3x smaller than node_timeout_ms ({}) to avoid false positive dead node detection. Recommended: heartbeat_interval_ms <= {}",
2983 self.heartbeat_interval_ms,
2984 self.node_timeout_ms,
2985 self.node_timeout_ms / 3
2986 ));
2987 }
2988
2989 if self.cleanup_interval_ms > self.node_timeout_ms {
2990 return Err(format!(
2991 "cleanup_interval_ms ({}) should not be larger than node_timeout_ms ({}) to ensure timely dead node detection",
2992 self.cleanup_interval_ms, self.node_timeout_ms
2993 ));
2994 }
2995
2996 Ok(())
2997 }
2998}
2999
3000impl ServerOptions {
3001 pub async fn load_from_file(path: &str) -> Result<Self, Box<dyn std::error::Error>> {
3002 let content = tokio::fs::read_to_string(path).await?;
3003 let options: Self = if path.ends_with(".toml") {
3004 toml::from_str(&content)?
3005 } else {
3006 sonic_rs::from_str(&content)?
3008 };
3009 Ok(options)
3010 }
3011
3012 pub async fn override_from_env(&mut self) -> Result<(), Box<dyn std::error::Error>> {
3013 if let Ok(mode) = std::env::var("ENVIRONMENT") {
3015 self.mode = mode;
3016 }
3017 self.debug = parse_bool_env("DEBUG_MODE", self.debug);
3018 if parse_bool_env("DEBUG", false) {
3019 self.debug = true;
3020 info!("DEBUG environment variable forces debug mode ON");
3021 }
3022
3023 self.activity_timeout = parse_env::<u64>("ACTIVITY_TIMEOUT", self.activity_timeout);
3024
3025 if let Ok(host) = std::env::var("HOST") {
3026 self.host = host;
3027 }
3028 self.port = parse_env::<u16>("PORT", self.port);
3029 self.shutdown_grace_period =
3030 parse_env::<u64>("SHUTDOWN_GRACE_PERIOD", self.shutdown_grace_period);
3031 self.user_authentication_timeout = parse_env::<u64>(
3032 "USER_AUTHENTICATION_TIMEOUT",
3033 self.user_authentication_timeout,
3034 );
3035 self.websocket_max_payload_kb =
3036 parse_env::<u32>("WEBSOCKET_MAX_PAYLOAD_KB", self.websocket_max_payload_kb);
3037 if let Ok(id) = std::env::var("INSTANCE_PROCESS_ID") {
3038 self.instance.process_id = id;
3039 }
3040
3041 if let Ok(driver_str) = std::env::var("ADAPTER_DRIVER") {
3043 self.adapter.driver =
3044 parse_driver_enum(driver_str, self.adapter.driver.clone(), "Adapter");
3045 }
3046 self.adapter.buffer_multiplier_per_cpu = parse_env::<usize>(
3047 "ADAPTER_BUFFER_MULTIPLIER_PER_CPU",
3048 self.adapter.buffer_multiplier_per_cpu,
3049 );
3050 self.adapter.enable_socket_counting = parse_env::<bool>(
3051 "ADAPTER_ENABLE_SOCKET_COUNTING",
3052 self.adapter.enable_socket_counting,
3053 );
3054 self.adapter.aggregate_counts =
3055 parse_env::<bool>("ADAPTER_AGGREGATE_COUNTS", self.adapter.aggregate_counts);
3056 self.adapter.fallback_to_local =
3057 parse_env::<bool>("ADAPTER_FALLBACK_TO_LOCAL", self.adapter.fallback_to_local);
3058 if let Ok(driver_str) = std::env::var("CACHE_DRIVER") {
3059 self.cache.driver = parse_driver_enum(driver_str, self.cache.driver.clone(), "Cache");
3060 }
3061 if let Ok(driver_str) = std::env::var("QUEUE_DRIVER") {
3062 self.queue.driver = parse_driver_enum(driver_str, self.queue.driver.clone(), "Queue");
3063 }
3064 if let Ok(driver_str) = std::env::var("APP_MANAGER_DRIVER") {
3065 self.app_manager.driver =
3066 parse_driver_enum(driver_str, self.app_manager.driver.clone(), "AppManager");
3067 }
3068 if let Ok(driver_str) = std::env::var("RATE_LIMITER_DRIVER") {
3069 self.rate_limiter.driver = parse_driver_enum(
3070 driver_str,
3071 self.rate_limiter.driver.clone(),
3072 "RateLimiter Backend",
3073 );
3074 }
3075
3076 if let Ok(host) = std::env::var("DATABASE_REDIS_HOST") {
3078 self.database.redis.host = host;
3079 }
3080 self.database.redis.port =
3081 parse_env::<u16>("DATABASE_REDIS_PORT", self.database.redis.port);
3082 if let Ok(username) = std::env::var("DATABASE_REDIS_USERNAME") {
3083 self.database.redis.username = if username.is_empty() {
3084 None
3085 } else {
3086 Some(username)
3087 };
3088 }
3089 if let Ok(password) = std::env::var("DATABASE_REDIS_PASSWORD") {
3090 self.database.redis.password = Some(password);
3091 }
3092 self.database.redis.db = parse_env::<u32>("DATABASE_REDIS_DB", self.database.redis.db);
3093 if let Ok(prefix) = std::env::var("DATABASE_REDIS_KEY_PREFIX") {
3094 self.database.redis.key_prefix = prefix;
3095 }
3096 if let Ok(cluster_username) = std::env::var("DATABASE_REDIS_CLUSTER_USERNAME") {
3097 self.database.redis.cluster.username = if cluster_username.is_empty() {
3098 None
3099 } else {
3100 Some(cluster_username)
3101 };
3102 }
3103 if let Ok(cluster_password) = std::env::var("DATABASE_REDIS_CLUSTER_PASSWORD") {
3104 self.database.redis.cluster.password = Some(cluster_password);
3105 }
3106 self.database.redis.cluster.use_tls = parse_bool_env(
3107 "DATABASE_REDIS_CLUSTER_USE_TLS",
3108 self.database.redis.cluster.use_tls,
3109 );
3110
3111 if let Ok(host) = std::env::var("DATABASE_MYSQL_HOST") {
3113 self.database.mysql.host = host;
3114 }
3115 self.database.mysql.port =
3116 parse_env::<u16>("DATABASE_MYSQL_PORT", self.database.mysql.port);
3117 if let Ok(user) = std::env::var("DATABASE_MYSQL_USERNAME") {
3118 self.database.mysql.username = user;
3119 }
3120 if let Ok(pass) = std::env::var("DATABASE_MYSQL_PASSWORD") {
3121 self.database.mysql.password = pass;
3122 }
3123 if let Ok(db) = std::env::var("DATABASE_MYSQL_DATABASE") {
3124 self.database.mysql.database = db;
3125 }
3126 if let Ok(table) = std::env::var("DATABASE_MYSQL_TABLE_NAME") {
3127 self.database.mysql.table_name = table;
3128 }
3129 override_db_pool_settings(&mut self.database.mysql, "DATABASE_MYSQL");
3130
3131 if let Ok(host) = std::env::var("DATABASE_POSTGRES_HOST") {
3133 self.database.postgres.host = host;
3134 }
3135 self.database.postgres.port =
3136 parse_env::<u16>("DATABASE_POSTGRES_PORT", self.database.postgres.port);
3137 if let Ok(user) = std::env::var("DATABASE_POSTGRES_USERNAME") {
3138 self.database.postgres.username = user;
3139 }
3140 if let Ok(pass) = std::env::var("DATABASE_POSTGRES_PASSWORD") {
3141 self.database.postgres.password = pass;
3142 }
3143 if let Ok(db) = std::env::var("DATABASE_POSTGRES_DATABASE") {
3144 self.database.postgres.database = db;
3145 }
3146 override_db_pool_settings(&mut self.database.postgres, "DATABASE_POSTGRES");
3147
3148 if let Ok(region) = std::env::var("DATABASE_DYNAMODB_REGION") {
3150 self.database.dynamodb.region = region;
3151 }
3152 if let Ok(table) = std::env::var("DATABASE_DYNAMODB_TABLE_NAME") {
3153 self.database.dynamodb.table_name = table;
3154 }
3155 if let Ok(endpoint) = std::env::var("DATABASE_DYNAMODB_ENDPOINT_URL") {
3156 self.database.dynamodb.endpoint_url = Some(endpoint);
3157 }
3158 if let Ok(key_id) = std::env::var("AWS_ACCESS_KEY_ID") {
3159 self.database.dynamodb.aws_access_key_id = Some(key_id);
3160 }
3161 if let Ok(secret) = std::env::var("AWS_SECRET_ACCESS_KEY") {
3162 self.database.dynamodb.aws_secret_access_key = Some(secret);
3163 }
3164
3165 if let Ok(url) = std::env::var("DATABASE_SURREALDB_URL") {
3167 self.database.surrealdb.url = url;
3168 }
3169 if let Ok(namespace) = std::env::var("DATABASE_SURREALDB_NAMESPACE") {
3170 self.database.surrealdb.namespace = namespace;
3171 }
3172 if let Ok(database) = std::env::var("DATABASE_SURREALDB_DATABASE") {
3173 self.database.surrealdb.database = database;
3174 }
3175 if let Ok(username) = std::env::var("DATABASE_SURREALDB_USERNAME") {
3176 self.database.surrealdb.username = username;
3177 }
3178 if let Ok(password) = std::env::var("DATABASE_SURREALDB_PASSWORD") {
3179 self.database.surrealdb.password = password;
3180 }
3181 if let Ok(table) = std::env::var("DATABASE_SURREALDB_TABLE_NAME") {
3182 self.database.surrealdb.table_name = table;
3183 }
3184
3185 let apply_redis_cluster_nodes = |options: &mut Self, nodes: &str| {
3187 let node_list: Vec<String> = nodes
3188 .split(',')
3189 .map(|s| s.trim())
3190 .filter(|s| !s.is_empty())
3191 .map(ToString::to_string)
3192 .collect();
3193
3194 options.adapter.cluster.nodes = node_list.clone();
3195 options.queue.redis_cluster.nodes = node_list.clone();
3196
3197 let parsed_nodes: Vec<ClusterNode> = node_list
3198 .iter()
3199 .filter_map(|seed| ClusterNode::from_seed(seed))
3200 .collect();
3201 options.database.redis.cluster.nodes = parsed_nodes.clone();
3202 options.database.redis.cluster_nodes = parsed_nodes;
3203 };
3204
3205 if let Ok(nodes) = std::env::var("REDIS_CLUSTER_NODES") {
3206 apply_redis_cluster_nodes(self, &nodes);
3207 }
3208 if let Ok(nodes) = std::env::var("DATABASE_REDIS_CLUSTER_NODES") {
3209 apply_redis_cluster_nodes(self, &nodes);
3210 }
3211 self.queue.redis_cluster.concurrency = parse_env::<u32>(
3212 "REDIS_CLUSTER_QUEUE_CONCURRENCY",
3213 self.queue.redis_cluster.concurrency,
3214 );
3215 if let Ok(prefix) = std::env::var("REDIS_CLUSTER_QUEUE_PREFIX") {
3216 self.queue.redis_cluster.prefix = Some(prefix);
3217 }
3218
3219 self.ssl.enabled = parse_bool_env("SSL_ENABLED", self.ssl.enabled);
3221 if let Ok(val) = std::env::var("SSL_CERT_PATH") {
3222 self.ssl.cert_path = val;
3223 }
3224 if let Ok(val) = std::env::var("SSL_KEY_PATH") {
3225 self.ssl.key_path = val;
3226 }
3227 self.ssl.redirect_http = parse_bool_env("SSL_REDIRECT_HTTP", self.ssl.redirect_http);
3228 if let Some(port) = parse_env_optional::<u16>("SSL_HTTP_PORT") {
3229 self.ssl.http_port = Some(port);
3230 }
3231
3232 self.unix_socket.enabled = parse_bool_env("UNIX_SOCKET_ENABLED", self.unix_socket.enabled);
3234 if let Ok(path) = std::env::var("UNIX_SOCKET_PATH") {
3235 self.unix_socket.path = path;
3236 }
3237 if let Ok(mode_str) = std::env::var("UNIX_SOCKET_PERMISSION_MODE") {
3238 if mode_str.chars().all(|c| c.is_digit(8)) {
3239 if let Ok(mode) = u32::from_str_radix(&mode_str, 8) {
3240 if mode <= 0o777 {
3241 self.unix_socket.permission_mode = mode;
3242 } else {
3243 warn!(
3244 "UNIX_SOCKET_PERMISSION_MODE '{}' exceeds maximum value 777. Using default: {:o}",
3245 mode_str, self.unix_socket.permission_mode
3246 );
3247 }
3248 } else {
3249 warn!(
3250 "Failed to parse UNIX_SOCKET_PERMISSION_MODE '{}' as octal. Using default: {:o}",
3251 mode_str, self.unix_socket.permission_mode
3252 );
3253 }
3254 } else {
3255 warn!(
3256 "UNIX_SOCKET_PERMISSION_MODE '{}' must contain only octal digits (0-7). Using default: {:o}",
3257 mode_str, self.unix_socket.permission_mode
3258 );
3259 }
3260 }
3261
3262 if let Ok(driver_str) = std::env::var("METRICS_DRIVER") {
3264 self.metrics.driver =
3265 parse_driver_enum(driver_str, self.metrics.driver.clone(), "Metrics");
3266 }
3267 self.metrics.enabled = parse_bool_env("METRICS_ENABLED", self.metrics.enabled);
3268 if let Ok(val) = std::env::var("METRICS_HOST") {
3269 self.metrics.host = val;
3270 }
3271 self.metrics.port = parse_env::<u16>("METRICS_PORT", self.metrics.port);
3272 if let Ok(val) = std::env::var("METRICS_PROMETHEUS_PREFIX") {
3273 self.metrics.prometheus.prefix = val;
3274 }
3275 self.metrics.tcp_exporter.enabled = parse_bool_env(
3276 "METRICS_TCP_EXPORTER_ENABLED",
3277 self.metrics.tcp_exporter.enabled,
3278 );
3279 if let Ok(val) = std::env::var("METRICS_TCP_EXPORTER_HOST") {
3280 self.metrics.tcp_exporter.host = val;
3281 }
3282 self.metrics.tcp_exporter.port =
3283 parse_env::<u16>("METRICS_TCP_EXPORTER_PORT", self.metrics.tcp_exporter.port);
3284 if let Some(buffer_size) = parse_env_optional::<usize>("METRICS_TCP_EXPORTER_BUFFER_SIZE") {
3285 self.metrics.tcp_exporter.buffer_size = Some(buffer_size);
3286 }
3287
3288 self.http_api.usage_enabled =
3290 parse_bool_env("HTTP_API_USAGE_ENABLED", self.http_api.usage_enabled);
3291
3292 self.rate_limiter.enabled =
3294 parse_bool_env("RATE_LIMITER_ENABLED", self.rate_limiter.enabled);
3295 self.rate_limiter.api_rate_limit.max_requests = parse_env::<u32>(
3296 "RATE_LIMITER_API_MAX_REQUESTS",
3297 self.rate_limiter.api_rate_limit.max_requests,
3298 );
3299 self.rate_limiter.api_rate_limit.window_seconds = parse_env::<u64>(
3300 "RATE_LIMITER_API_WINDOW_SECONDS",
3301 self.rate_limiter.api_rate_limit.window_seconds,
3302 );
3303 if let Some(hops) = parse_env_optional::<u32>("RATE_LIMITER_API_TRUST_HOPS") {
3304 self.rate_limiter.api_rate_limit.trust_hops = Some(hops);
3305 }
3306 self.rate_limiter.websocket_rate_limit.max_requests = parse_env::<u32>(
3307 "RATE_LIMITER_WS_MAX_REQUESTS",
3308 self.rate_limiter.websocket_rate_limit.max_requests,
3309 );
3310 self.rate_limiter.websocket_rate_limit.window_seconds = parse_env::<u64>(
3311 "RATE_LIMITER_WS_WINDOW_SECONDS",
3312 self.rate_limiter.websocket_rate_limit.window_seconds,
3313 );
3314 if let Some(hops) = parse_env_optional::<u32>("RATE_LIMITER_WS_TRUST_HOPS") {
3315 self.rate_limiter.websocket_rate_limit.trust_hops = Some(hops);
3316 }
3317 if let Ok(prefix) = std::env::var("RATE_LIMITER_REDIS_PREFIX") {
3318 self.rate_limiter.redis.prefix = Some(prefix);
3319 }
3320
3321 self.queue.redis.concurrency =
3323 parse_env::<u32>("QUEUE_REDIS_CONCURRENCY", self.queue.redis.concurrency);
3324 if let Ok(prefix) = std::env::var("QUEUE_REDIS_PREFIX") {
3325 self.queue.redis.prefix = Some(prefix);
3326 }
3327
3328 if let Ok(region) = std::env::var("QUEUE_SQS_REGION") {
3330 self.queue.sqs.region = region;
3331 }
3332 self.queue.sqs.visibility_timeout = parse_env::<i32>(
3333 "QUEUE_SQS_VISIBILITY_TIMEOUT",
3334 self.queue.sqs.visibility_timeout,
3335 );
3336 self.queue.sqs.max_messages =
3337 parse_env::<i32>("QUEUE_SQS_MAX_MESSAGES", self.queue.sqs.max_messages);
3338 self.queue.sqs.wait_time_seconds = parse_env::<i32>(
3339 "QUEUE_SQS_WAIT_TIME_SECONDS",
3340 self.queue.sqs.wait_time_seconds,
3341 );
3342 self.queue.sqs.concurrency =
3343 parse_env::<u32>("QUEUE_SQS_CONCURRENCY", self.queue.sqs.concurrency);
3344 self.queue.sqs.fifo = parse_bool_env("QUEUE_SQS_FIFO", self.queue.sqs.fifo);
3345 if let Ok(endpoint) = std::env::var("QUEUE_SQS_ENDPOINT_URL") {
3346 self.queue.sqs.endpoint_url = Some(endpoint);
3347 }
3348
3349 if let Ok(region) = std::env::var("QUEUE_SNS_REGION") {
3351 self.queue.sns.region = region;
3352 }
3353 if let Ok(topic_arn) = std::env::var("QUEUE_SNS_TOPIC_ARN") {
3354 self.queue.sns.topic_arn = topic_arn;
3355 }
3356 if let Ok(endpoint) = std::env::var("QUEUE_SNS_ENDPOINT_URL") {
3357 self.queue.sns.endpoint_url = Some(endpoint);
3358 }
3359
3360 self.webhooks.batching.enabled =
3362 parse_bool_env("WEBHOOK_BATCHING_ENABLED", self.webhooks.batching.enabled);
3363 self.webhooks.batching.duration =
3364 parse_env::<u64>("WEBHOOK_BATCHING_DURATION", self.webhooks.batching.duration);
3365 self.webhooks.batching.size =
3366 parse_env::<usize>("WEBHOOK_BATCHING_SIZE", self.webhooks.batching.size);
3367
3368 if let Ok(servers) = std::env::var("NATS_SERVERS") {
3370 self.adapter.nats.servers = servers.split(',').map(|s| s.trim().to_string()).collect();
3371 }
3372 if let Ok(user) = std::env::var("NATS_USERNAME") {
3373 self.adapter.nats.username = Some(user);
3374 }
3375 if let Ok(pass) = std::env::var("NATS_PASSWORD") {
3376 self.adapter.nats.password = Some(pass);
3377 }
3378 if let Ok(token) = std::env::var("NATS_TOKEN") {
3379 self.adapter.nats.token = Some(token);
3380 }
3381 if let Ok(prefix) = std::env::var("NATS_PREFIX") {
3382 self.adapter.nats.prefix = prefix;
3383 }
3384 self.adapter.nats.connection_timeout_ms = parse_env::<u64>(
3385 "NATS_CONNECTION_TIMEOUT_MS",
3386 self.adapter.nats.connection_timeout_ms,
3387 );
3388 self.adapter.nats.request_timeout_ms = parse_env::<u64>(
3389 "NATS_REQUEST_TIMEOUT_MS",
3390 self.adapter.nats.request_timeout_ms,
3391 );
3392 self.adapter.nats.discovery_max_wait_ms = parse_env::<u64>(
3393 "NATS_DISCOVERY_MAX_WAIT_MS",
3394 self.adapter.nats.discovery_max_wait_ms,
3395 );
3396 self.adapter.nats.discovery_idle_wait_ms = parse_env::<u64>(
3397 "NATS_DISCOVERY_IDLE_WAIT_MS",
3398 self.adapter.nats.discovery_idle_wait_ms,
3399 );
3400 if let Some(nodes) = parse_env_optional::<u32>("NATS_NODES_NUMBER") {
3401 self.adapter.nats.nodes_number = Some(nodes);
3402 }
3403 if let Some(v) = parse_env_optional::<usize>("NATS_SUBSCRIPTION_CAPACITY") {
3404 self.adapter.nats.subscription_capacity = Some(v);
3405 }
3406 if let Some(v) = parse_env_optional::<usize>("NATS_CLIENT_CAPACITY") {
3407 self.adapter.nats.client_capacity = Some(v);
3408 }
3409 if let Some(v) = parse_env_optional::<usize>("NATS_MAX_RECONNECTS") {
3410 self.adapter.nats.max_reconnects = Some(v);
3411 }
3412 if let Some(v) = parse_env_optional::<usize>("NATS_PRESENCE_SYNC_CHUNK_SIZE") {
3413 self.adapter.nats.presence_sync_chunk_size = Some(v);
3414 }
3415
3416 if let Ok(url) = std::env::var("PULSAR_URL") {
3418 self.adapter.pulsar.url = url;
3419 }
3420 if let Ok(prefix) = std::env::var("PULSAR_PREFIX") {
3421 self.adapter.pulsar.prefix = prefix;
3422 }
3423 if let Ok(token) = std::env::var("PULSAR_TOKEN") {
3424 self.adapter.pulsar.token = Some(token);
3425 }
3426 self.adapter.pulsar.request_timeout_ms = parse_env::<u64>(
3427 "PULSAR_REQUEST_TIMEOUT_MS",
3428 self.adapter.pulsar.request_timeout_ms,
3429 );
3430 if let Some(nodes) = parse_env_optional::<u32>("PULSAR_NODES_NUMBER") {
3431 self.adapter.pulsar.nodes_number = Some(nodes);
3432 }
3433
3434 if let Ok(url) = std::env::var("RABBITMQ_URL") {
3436 self.adapter.rabbitmq.url = url;
3437 }
3438 if let Ok(prefix) = std::env::var("RABBITMQ_PREFIX") {
3439 self.adapter.rabbitmq.prefix = prefix;
3440 }
3441 self.adapter.rabbitmq.connection_timeout_ms = parse_env::<u64>(
3442 "RABBITMQ_CONNECTION_TIMEOUT_MS",
3443 self.adapter.rabbitmq.connection_timeout_ms,
3444 );
3445 self.adapter.rabbitmq.request_timeout_ms = parse_env::<u64>(
3446 "RABBITMQ_REQUEST_TIMEOUT_MS",
3447 self.adapter.rabbitmq.request_timeout_ms,
3448 );
3449 if let Some(nodes) = parse_env_optional::<u32>("RABBITMQ_NODES_NUMBER") {
3450 self.adapter.rabbitmq.nodes_number = Some(nodes);
3451 }
3452
3453 if let Ok(project_id) = std::env::var("GOOGLE_PUBSUB_PROJECT_ID") {
3455 self.adapter.google_pubsub.project_id = project_id;
3456 }
3457 if let Ok(prefix) = std::env::var("GOOGLE_PUBSUB_PREFIX") {
3458 self.adapter.google_pubsub.prefix = prefix;
3459 }
3460 if let Ok(emulator_host) = std::env::var("PUBSUB_EMULATOR_HOST") {
3461 self.adapter.google_pubsub.emulator_host = Some(emulator_host);
3462 }
3463 self.adapter.google_pubsub.request_timeout_ms = parse_env::<u64>(
3464 "GOOGLE_PUBSUB_REQUEST_TIMEOUT_MS",
3465 self.adapter.google_pubsub.request_timeout_ms,
3466 );
3467 if let Some(nodes) = parse_env_optional::<u32>("GOOGLE_PUBSUB_NODES_NUMBER") {
3468 self.adapter.google_pubsub.nodes_number = Some(nodes);
3469 }
3470
3471 if let Ok(brokers) = std::env::var("KAFKA_BROKERS") {
3473 self.adapter.kafka.brokers = brokers.split(',').map(|s| s.trim().to_string()).collect();
3474 }
3475 if let Ok(prefix) = std::env::var("KAFKA_PREFIX") {
3476 self.adapter.kafka.prefix = prefix;
3477 }
3478 if let Ok(protocol) = std::env::var("KAFKA_SECURITY_PROTOCOL") {
3479 self.adapter.kafka.security_protocol = Some(protocol);
3480 }
3481 if let Ok(mechanism) = std::env::var("KAFKA_SASL_MECHANISM") {
3482 self.adapter.kafka.sasl_mechanism = Some(mechanism);
3483 }
3484 if let Ok(username) = std::env::var("KAFKA_SASL_USERNAME") {
3485 self.adapter.kafka.sasl_username = Some(username);
3486 }
3487 if let Ok(password) = std::env::var("KAFKA_SASL_PASSWORD") {
3488 self.adapter.kafka.sasl_password = Some(password);
3489 }
3490 self.adapter.kafka.request_timeout_ms = parse_env::<u64>(
3491 "KAFKA_REQUEST_TIMEOUT_MS",
3492 self.adapter.kafka.request_timeout_ms,
3493 );
3494 if let Some(nodes) = parse_env_optional::<u32>("KAFKA_NODES_NUMBER") {
3495 self.adapter.kafka.nodes_number = Some(nodes);
3496 }
3497
3498 if let Ok(connection_string) = std::env::var("IGGY_CONNECTION_STRING") {
3500 self.adapter.iggy.connection_string = connection_string.clone();
3501 self.queue.iggy.connection_string = connection_string;
3502 }
3503 if let Ok(username) = std::env::var("IGGY_USERNAME") {
3504 let username = (!username.is_empty()).then_some(username);
3505 self.adapter.iggy.username = username.clone();
3506 self.queue.iggy.username = username;
3507 }
3508 if let Ok(password) = std::env::var("IGGY_PASSWORD") {
3509 let password = (!password.is_empty()).then_some(password);
3510 self.adapter.iggy.password = password.clone();
3511 self.queue.iggy.password = password;
3512 }
3513 if let Ok(consumer_name) = std::env::var("IGGY_CONSUMER_NAME") {
3514 let consumer_name = (!consumer_name.is_empty()).then_some(consumer_name);
3515 self.adapter.iggy.consumer_name = consumer_name.clone();
3516 self.queue.iggy.consumer_name = consumer_name;
3517 } else if let Ok(process_id) = std::env::var("INSTANCE_PROCESS_ID") {
3518 let process_id = (!process_id.is_empty()).then_some(process_id);
3519 self.adapter.iggy.consumer_name = process_id.clone();
3520 self.queue.iggy.consumer_name = process_id;
3521 }
3522 if let Ok(stream) = std::env::var("IGGY_STREAM") {
3523 self.adapter.iggy.stream = stream.clone();
3524 self.queue.iggy.stream = stream;
3525 }
3526 if let Ok(prefix) = std::env::var("IGGY_TOPIC_PREFIX") {
3527 self.adapter.iggy.topic_prefix = prefix;
3528 }
3529 if let Ok(prefix) = std::env::var("IGGY_QUEUE_TOPIC_PREFIX") {
3530 self.queue.iggy.queue_topic_prefix = prefix;
3531 }
3532 if let Ok(prefix) = std::env::var("IGGY_CONSUMER_GROUP_PREFIX") {
3533 self.queue.iggy.consumer_group_prefix = prefix;
3534 }
3535 self.adapter.iggy.request_timeout_ms = parse_env::<u64>(
3536 "IGGY_REQUEST_TIMEOUT_MS",
3537 self.adapter.iggy.request_timeout_ms,
3538 );
3539 self.queue.iggy.request_timeout_ms = parse_env::<u64>(
3540 "IGGY_REQUEST_TIMEOUT_MS",
3541 self.queue.iggy.request_timeout_ms,
3542 );
3543 self.adapter.iggy.poll_interval_ms =
3544 parse_env::<u64>("IGGY_POLL_INTERVAL_MS", self.adapter.iggy.poll_interval_ms);
3545 self.queue.iggy.poll_interval_ms =
3546 parse_env::<u64>("IGGY_POLL_INTERVAL_MS", self.queue.iggy.poll_interval_ms);
3547 self.adapter.iggy.poll_batch_size =
3548 parse_env::<u32>("IGGY_POLL_BATCH_SIZE", self.adapter.iggy.poll_batch_size);
3549 self.queue.iggy.poll_batch_size =
3550 parse_env::<u32>("IGGY_POLL_BATCH_SIZE", self.queue.iggy.poll_batch_size);
3551 self.adapter.iggy.partitions_count = parse_env::<u32>(
3552 "ADAPTER_IGGY_PARTITIONS_COUNT",
3553 parse_env::<u32>("IGGY_PARTITIONS_COUNT", self.adapter.iggy.partitions_count),
3554 );
3555 self.queue.iggy.partitions_count = parse_env::<u32>(
3556 "QUEUE_IGGY_PARTITIONS_COUNT",
3557 parse_env::<u32>("IGGY_PARTITIONS_COUNT", self.queue.iggy.partitions_count),
3558 );
3559 self.adapter.iggy.partition_id = parse_env::<u32>(
3560 "ADAPTER_IGGY_PARTITION_ID",
3561 parse_env::<u32>("IGGY_PARTITION_ID", self.adapter.iggy.partition_id),
3562 );
3563 self.queue.iggy.partition_id = parse_env::<u32>(
3564 "QUEUE_IGGY_PARTITION_ID",
3565 parse_env::<u32>("IGGY_PARTITION_ID", self.queue.iggy.partition_id),
3566 );
3567 self.adapter.iggy.auto_create =
3568 parse_env::<bool>("IGGY_AUTO_CREATE", self.adapter.iggy.auto_create);
3569 self.queue.iggy.auto_create =
3570 parse_env::<bool>("IGGY_AUTO_CREATE", self.queue.iggy.auto_create);
3571 self.adapter.iggy.start_from_latest = parse_env::<bool>(
3572 "IGGY_START_FROM_LATEST",
3573 self.adapter.iggy.start_from_latest,
3574 );
3575 if let Some(nodes) = parse_env_optional::<u32>("IGGY_NODES_NUMBER") {
3576 self.adapter.iggy.nodes_number = Some(nodes);
3577 self.queue.iggy.nodes_number = Some(nodes);
3578 }
3579
3580 if let Ok(origins) = std::env::var("CORS_ORIGINS") {
3582 let parsed: Vec<String> = origins.split(',').map(|s| s.trim().to_string()).collect();
3583 if let Err(e) = crate::origin_validation::OriginValidator::validate_patterns(&parsed) {
3584 warn!(
3585 "CORS_ORIGINS contains invalid patterns: {}. Keeping previous CORS origins.",
3586 e
3587 );
3588 } else {
3589 self.cors.origin = parsed;
3590 }
3591 }
3592 if let Ok(methods) = std::env::var("CORS_METHODS") {
3593 self.cors.methods = methods.split(',').map(|s| s.trim().to_string()).collect();
3594 }
3595 if let Ok(headers) = std::env::var("CORS_HEADERS") {
3596 self.cors.allowed_headers = headers.split(',').map(|s| s.trim().to_string()).collect();
3597 }
3598 self.cors.credentials = parse_bool_env("CORS_CREDENTIALS", self.cors.credentials);
3599
3600 self.database_pooling.enabled =
3602 parse_bool_env("DATABASE_POOLING_ENABLED", self.database_pooling.enabled);
3603 if let Some(min) = parse_env_optional::<u32>("DATABASE_POOL_MIN") {
3604 self.database_pooling.min = min;
3605 }
3606 if let Some(max) = parse_env_optional::<u32>("DATABASE_POOL_MAX") {
3607 self.database_pooling.max = max;
3608 }
3609
3610 if let Some(pool_size) = parse_env_optional::<u32>("DATABASE_CONNECTION_POOL_SIZE") {
3611 self.database.mysql.connection_pool_size = pool_size;
3612 self.database.postgres.connection_pool_size = pool_size;
3613 }
3614 if let Some(cache_ttl) = parse_env_optional::<u64>("CACHE_TTL_SECONDS") {
3615 self.app_manager.cache.ttl = cache_ttl;
3616 self.channel_limits.cache_ttl = cache_ttl;
3617 self.database.mysql.cache_ttl = cache_ttl;
3618 self.database.postgres.cache_ttl = cache_ttl;
3619 self.database.surrealdb.cache_ttl = cache_ttl;
3620 self.cache.memory.ttl = cache_ttl;
3621 }
3622 if let Some(cleanup_interval) = parse_env_optional::<u64>("CACHE_CLEANUP_INTERVAL") {
3623 self.database.mysql.cache_cleanup_interval = cleanup_interval;
3624 self.database.postgres.cache_cleanup_interval = cleanup_interval;
3625 self.cache.memory.cleanup_interval = cleanup_interval;
3626 }
3627 if let Some(max_capacity) = parse_env_optional::<u64>("CACHE_MAX_CAPACITY") {
3628 self.database.mysql.cache_max_capacity = max_capacity;
3629 self.database.postgres.cache_max_capacity = max_capacity;
3630 self.database.surrealdb.cache_max_capacity = max_capacity;
3631 self.cache.memory.max_capacity = max_capacity;
3632 }
3633
3634 let skip_inline_apps = parse_bool_env("SOCKUDO_SKIP_INLINE_APPS", false)
3635 || !parse_bool_env("APP_MANAGER_REGISTER_INLINE_APPS", true);
3636 if skip_inline_apps {
3637 let app_count = self.app_manager.array.apps.len();
3638 self.app_manager.array.apps.clear();
3639 if app_count > 0 {
3640 info!(
3641 "Skipping {} inline app(s) from configuration due to environment override",
3642 app_count
3643 );
3644 }
3645 }
3646
3647 let default_app_id = std::env::var("SOCKUDO_DEFAULT_APP_ID").ok();
3648 let default_app_key = std::env::var("SOCKUDO_DEFAULT_APP_KEY").ok();
3649 let default_app_secret = std::env::var("SOCKUDO_DEFAULT_APP_SECRET").ok();
3650 let default_app_enabled_env = std::env::var("SOCKUDO_DEFAULT_APP_ENABLED").ok();
3651 let default_app_enabled = parse_bool_env("SOCKUDO_DEFAULT_APP_ENABLED", true);
3652 let default_app_credentials_configured =
3653 default_app_id.is_some() || default_app_key.is_some() || default_app_secret.is_some();
3654 let default_app_env_configured =
3655 default_app_credentials_configured || default_app_enabled_env.is_some();
3656 let default_app_should_override_inline = default_app_credentials_configured
3657 || default_app_enabled_env.is_some_and(|_| !default_app_enabled);
3658
3659 if default_app_should_override_inline {
3660 let app_count = self.app_manager.array.apps.len();
3661 self.app_manager.array.apps.clear();
3662 if app_count > 0 {
3663 info!(
3664 "Replacing {} inline app(s) from configuration with SOCKUDO_DEFAULT_APP_* settings",
3665 app_count
3666 );
3667 }
3668 }
3669
3670 if let (Some(app_id), Some(app_key), Some(app_secret)) =
3671 (default_app_id, default_app_key, default_app_secret)
3672 && default_app_enabled
3673 {
3674 let default_app = App::from_policy(
3675 app_id,
3676 app_key,
3677 app_secret,
3678 default_app_enabled,
3679 crate::app::AppPolicy {
3680 limits: crate::app::AppLimitsPolicy {
3681 max_connections: parse_env::<u32>(
3682 "SOCKUDO_DEFAULT_APP_MAX_CONNECTIONS",
3683 100,
3684 ),
3685 max_backend_events_per_second: Some(parse_env::<u32>(
3686 "SOCKUDO_DEFAULT_APP_MAX_BACKEND_EVENTS_PER_SECOND",
3687 100,
3688 )),
3689 max_client_events_per_second: parse_env::<u32>(
3690 "SOCKUDO_DEFAULT_APP_MAX_CLIENT_EVENTS_PER_SECOND",
3691 100,
3692 ),
3693 max_read_requests_per_second: Some(parse_env::<u32>(
3694 "SOCKUDO_DEFAULT_APP_MAX_READ_REQUESTS_PER_SECOND",
3695 100,
3696 )),
3697 max_presence_members_per_channel: Some(parse_env::<u32>(
3698 "SOCKUDO_DEFAULT_APP_MAX_PRESENCE_MEMBERS_PER_CHANNEL",
3699 100,
3700 )),
3701 max_presence_member_size_in_kb: Some(parse_env::<u32>(
3702 "SOCKUDO_DEFAULT_APP_MAX_PRESENCE_MEMBER_SIZE_IN_KB",
3703 100,
3704 )),
3705 max_channel_name_length: Some(parse_env::<u32>(
3706 "SOCKUDO_DEFAULT_APP_MAX_CHANNEL_NAME_LENGTH",
3707 100,
3708 )),
3709 max_event_channels_at_once: Some(parse_env::<u32>(
3710 "SOCKUDO_DEFAULT_APP_MAX_EVENT_CHANNELS_AT_ONCE",
3711 100,
3712 )),
3713 max_event_name_length: Some(parse_env::<u32>(
3714 "SOCKUDO_DEFAULT_APP_MAX_EVENT_NAME_LENGTH",
3715 100,
3716 )),
3717 max_event_payload_in_kb: Some(parse_env::<u32>(
3718 "SOCKUDO_DEFAULT_APP_MAX_EVENT_PAYLOAD_IN_KB",
3719 100,
3720 )),
3721 max_event_batch_size: Some(parse_env::<u32>(
3722 "SOCKUDO_DEFAULT_APP_MAX_EVENT_BATCH_SIZE",
3723 100,
3724 )),
3725 decay_seconds: None,
3726 terminate_on_limit: false,
3727 message_rate_limit: None,
3728 },
3729 features: crate::app::AppFeaturesPolicy {
3730 enable_client_messages: std::env::var(
3731 "SOCKUDO_DEFAULT_APP_ENABLE_CLIENT_MESSAGES",
3732 )
3733 .ok()
3734 .map(|value| {
3735 matches!(
3736 value.trim().to_ascii_lowercase().as_str(),
3737 "true" | "1" | "yes" | "on"
3738 )
3739 })
3740 .unwrap_or_else(|| parse_bool_env("SOCKUDO_ENABLE_CLIENT_MESSAGES", false)),
3741 enable_user_authentication: Some(parse_bool_env(
3742 "SOCKUDO_DEFAULT_APP_ENABLE_USER_AUTHENTICATION",
3743 false,
3744 )),
3745 enable_watchlist_events: Some(parse_bool_env(
3746 "SOCKUDO_DEFAULT_APP_ENABLE_WATCHLIST_EVENTS",
3747 false,
3748 )),
3749 },
3750 channels: crate::app::AppChannelsPolicy {
3751 allowed_origins: {
3752 if let Ok(origins_str) =
3753 std::env::var("SOCKUDO_DEFAULT_APP_ALLOWED_ORIGINS")
3754 {
3755 if !origins_str.is_empty() {
3756 Some(
3757 origins_str
3758 .split(',')
3759 .map(|s| s.trim().to_string())
3760 .collect(),
3761 )
3762 } else {
3763 None
3764 }
3765 } else {
3766 None
3767 }
3768 },
3769 annotations_enabled: Some(parse_bool_env(
3770 "SOCKUDO_DEFAULT_APP_ANNOTATIONS_ENABLED",
3771 false,
3772 )),
3773 channel_delta_compression: None,
3774 channel_namespaces: None,
3775 },
3776 webhooks: None,
3777 idempotency: None,
3778 connection_recovery: None,
3779 history: None,
3780 presence_history: None,
3781 },
3782 );
3783
3784 self.app_manager.array.apps.push(default_app);
3785 info!("Successfully registered default app from env");
3786 } else if default_app_env_configured && !default_app_enabled {
3787 info!("Default app registration disabled by SOCKUDO_DEFAULT_APP_ENABLED=false");
3788 } else if default_app_credentials_configured {
3789 warn!(
3790 "SOCKUDO_DEFAULT_APP_* environment was configured but id, key, and secret were not all provided; no default app registered"
3791 );
3792 }
3793
3794 if let Ok(redis_url_env) = std::env::var("REDIS_URL") {
3796 info!("Applying REDIS_URL environment variable override");
3797
3798 let redis_url_json = sonic_rs::json!(redis_url_env);
3799
3800 self.adapter
3801 .redis
3802 .redis_pub_options
3803 .insert("url".to_string(), redis_url_json.clone());
3804 self.adapter
3805 .redis
3806 .redis_sub_options
3807 .insert("url".to_string(), redis_url_json);
3808
3809 self.cache.redis.url_override = Some(redis_url_env.clone());
3810 self.queue.redis.url_override = Some(redis_url_env.clone());
3811 self.rate_limiter.redis.url_override = Some(redis_url_env);
3812 }
3813
3814 let has_colors_env = std::env::var("LOG_COLORS_ENABLED").is_ok();
3816 let has_target_env = std::env::var("LOG_INCLUDE_TARGET").is_ok();
3817 if has_colors_env || has_target_env {
3818 let logging_config = self.logging.get_or_insert_with(Default::default);
3819 if has_colors_env {
3820 logging_config.colors_enabled =
3821 parse_bool_env("LOG_COLORS_ENABLED", logging_config.colors_enabled);
3822 }
3823 if has_target_env {
3824 logging_config.include_target =
3825 parse_bool_env("LOG_INCLUDE_TARGET", logging_config.include_target);
3826 }
3827 }
3828
3829 self.cleanup.async_enabled =
3831 parse_bool_env("CLEANUP_ASYNC_ENABLED", self.cleanup.async_enabled);
3832 self.cleanup.fallback_to_sync =
3833 parse_bool_env("CLEANUP_FALLBACK_TO_SYNC", self.cleanup.fallback_to_sync);
3834 self.cleanup.queue_buffer_size =
3835 parse_env::<usize>("CLEANUP_QUEUE_BUFFER_SIZE", self.cleanup.queue_buffer_size);
3836 self.cleanup.batch_size = parse_env::<usize>("CLEANUP_BATCH_SIZE", self.cleanup.batch_size);
3837 self.cleanup.batch_timeout_ms =
3838 parse_env::<u64>("CLEANUP_BATCH_TIMEOUT_MS", self.cleanup.batch_timeout_ms);
3839 if let Ok(worker_threads_str) = std::env::var("CLEANUP_WORKER_THREADS") {
3840 self.cleanup.worker_threads = if worker_threads_str.to_lowercase() == "auto" {
3841 WorkerThreadsConfig::Auto
3842 } else if let Ok(n) = worker_threads_str.parse::<usize>() {
3843 WorkerThreadsConfig::Fixed(n)
3844 } else {
3845 warn!(
3846 "Invalid CLEANUP_WORKER_THREADS value '{}', keeping current setting",
3847 worker_threads_str
3848 );
3849 self.cleanup.worker_threads.clone()
3850 };
3851 }
3852 self.cleanup.max_retry_attempts = parse_env::<u32>(
3853 "CLEANUP_MAX_RETRY_ATTEMPTS",
3854 self.cleanup.max_retry_attempts,
3855 );
3856
3857 self.cluster_health.enabled =
3859 parse_bool_env("CLUSTER_HEALTH_ENABLED", self.cluster_health.enabled);
3860 self.cluster_health.heartbeat_interval_ms = parse_env::<u64>(
3861 "CLUSTER_HEALTH_HEARTBEAT_INTERVAL",
3862 self.cluster_health.heartbeat_interval_ms,
3863 );
3864 self.cluster_health.node_timeout_ms = parse_env::<u64>(
3865 "CLUSTER_HEALTH_NODE_TIMEOUT",
3866 self.cluster_health.node_timeout_ms,
3867 );
3868 self.cluster_health.cleanup_interval_ms = parse_env::<u64>(
3869 "CLUSTER_HEALTH_CLEANUP_INTERVAL",
3870 self.cluster_health.cleanup_interval_ms,
3871 );
3872
3873 self.health_check_timeout_ms =
3875 parse_env::<u64>("HEALTH_CHECK_TIMEOUT_MS", self.health_check_timeout_ms);
3876
3877 self.tag_filtering.enabled =
3879 parse_bool_env("TAG_FILTERING_ENABLED", self.tag_filtering.enabled);
3880
3881 if let Ok(val) = std::env::var("WEBSOCKET_MAX_MESSAGES") {
3883 if val.to_lowercase() == "none" || val == "0" {
3884 self.websocket.max_messages = None;
3885 } else if let Ok(n) = val.parse::<usize>() {
3886 self.websocket.max_messages = Some(n);
3887 }
3888 }
3889 if let Ok(val) = std::env::var("WEBSOCKET_MAX_BYTES") {
3890 if val.to_lowercase() == "none" || val == "0" {
3891 self.websocket.max_bytes = None;
3892 } else if let Ok(n) = val.parse::<usize>() {
3893 self.websocket.max_bytes = Some(n);
3894 }
3895 }
3896 self.websocket.disconnect_on_buffer_full = parse_bool_env(
3897 "WEBSOCKET_DISCONNECT_ON_BUFFER_FULL",
3898 self.websocket.disconnect_on_buffer_full,
3899 );
3900 self.websocket.max_message_size = parse_env::<usize>(
3901 "WEBSOCKET_MAX_MESSAGE_SIZE",
3902 self.websocket.max_message_size,
3903 );
3904 self.websocket.max_frame_size =
3905 parse_env::<usize>("WEBSOCKET_MAX_FRAME_SIZE", self.websocket.max_frame_size);
3906 self.websocket.write_buffer_size = parse_env::<usize>(
3907 "WEBSOCKET_WRITE_BUFFER_SIZE",
3908 self.websocket.write_buffer_size,
3909 );
3910 self.websocket.max_backpressure = parse_env::<usize>(
3911 "WEBSOCKET_MAX_BACKPRESSURE",
3912 self.websocket.max_backpressure,
3913 );
3914 self.websocket.auto_ping = parse_bool_env("WEBSOCKET_AUTO_PING", self.websocket.auto_ping);
3915 self.websocket.ping_interval =
3916 parse_env::<u32>("WEBSOCKET_PING_INTERVAL", self.websocket.ping_interval);
3917 self.websocket.idle_timeout =
3918 parse_env::<u32>("WEBSOCKET_IDLE_TIMEOUT", self.websocket.idle_timeout);
3919 if let Ok(mode) = std::env::var("WEBSOCKET_COMPRESSION") {
3920 self.websocket.compression = mode;
3921 }
3922
3923 self.connection_recovery.enabled = parse_bool_env(
3925 "CONNECTION_RECOVERY_ENABLED",
3926 self.connection_recovery.enabled,
3927 );
3928 self.connection_recovery.buffer_ttl_seconds = parse_env::<u64>(
3929 "CONNECTION_RECOVERY_BUFFER_TTL",
3930 self.connection_recovery.buffer_ttl_seconds,
3931 );
3932 self.connection_recovery.max_buffer_size = parse_env::<usize>(
3933 "CONNECTION_RECOVERY_MAX_BUFFER_SIZE",
3934 self.connection_recovery.max_buffer_size,
3935 );
3936
3937 self.history.enabled = parse_bool_env("HISTORY_ENABLED", self.history.enabled);
3938 self.history.rewind_enabled =
3939 parse_bool_env("HISTORY_REWIND_ENABLED", self.history.rewind_enabled);
3940 self.history.retention_window_seconds = parse_env::<u64>(
3941 "HISTORY_RETENTION_WINDOW_SECONDS",
3942 self.history.retention_window_seconds,
3943 );
3944 self.history.max_page_size =
3945 parse_env::<usize>("HISTORY_MAX_PAGE_SIZE", self.history.max_page_size);
3946 self.history.writer_shards =
3947 parse_env::<usize>("HISTORY_WRITER_SHARDS", self.history.writer_shards);
3948 self.history.writer_queue_capacity = parse_env::<usize>(
3949 "HISTORY_WRITER_QUEUE_CAPACITY",
3950 self.history.writer_queue_capacity,
3951 );
3952 if let Ok(backend) = std::env::var("HISTORY_BACKEND") {
3953 self.history.backend = HistoryBackend::from_str(&backend)?;
3954 }
3955 if let Ok(max_messages) = std::env::var("HISTORY_MAX_MESSAGES_PER_CHANNEL") {
3956 self.history.max_messages_per_channel = Some(
3957 max_messages
3958 .parse::<usize>()
3959 .map_err(|e| format!("Invalid HISTORY_MAX_MESSAGES_PER_CHANNEL: {e}"))?,
3960 );
3961 }
3962 if let Ok(max_bytes) = std::env::var("HISTORY_MAX_BYTES_PER_CHANNEL") {
3963 self.history.max_bytes_per_channel = Some(
3964 max_bytes
3965 .parse::<u64>()
3966 .map_err(|e| format!("Invalid HISTORY_MAX_BYTES_PER_CHANNEL: {e}"))?,
3967 );
3968 }
3969 if let Ok(table_prefix) = std::env::var("HISTORY_POSTGRES_TABLE_PREFIX") {
3970 self.history.postgres.table_prefix = table_prefix;
3971 }
3972 self.history.postgres.write_timeout_ms = parse_env::<u64>(
3973 "HISTORY_POSTGRES_WRITE_TIMEOUT_MS",
3974 self.history.postgres.write_timeout_ms,
3975 );
3976 self.history.purge_interval_seconds = parse_env::<u64>(
3977 "HISTORY_PURGE_INTERVAL_SECONDS",
3978 self.history.purge_interval_seconds,
3979 );
3980 self.history.purge_batch_size =
3981 parse_env::<usize>("HISTORY_PURGE_BATCH_SIZE", self.history.purge_batch_size);
3982 self.history.max_purge_per_tick = parse_env::<usize>(
3983 "HISTORY_MAX_PURGE_PER_TICK",
3984 self.history.max_purge_per_tick,
3985 );
3986
3987 self.presence_history.enabled =
3988 parse_bool_env("PRESENCE_HISTORY_ENABLED", self.presence_history.enabled);
3989 self.presence_history.retention_window_seconds = parse_env::<u64>(
3990 "PRESENCE_HISTORY_RETENTION_WINDOW_SECONDS",
3991 self.presence_history.retention_window_seconds,
3992 );
3993 self.presence_history.max_page_size = parse_env::<usize>(
3994 "PRESENCE_HISTORY_MAX_PAGE_SIZE",
3995 self.presence_history.max_page_size,
3996 );
3997 if let Ok(max_events) = std::env::var("PRESENCE_HISTORY_MAX_EVENTS_PER_CHANNEL") {
3998 self.presence_history.max_events_per_channel =
3999 Some(max_events.parse::<usize>().map_err(|e| {
4000 format!("Invalid PRESENCE_HISTORY_MAX_EVENTS_PER_CHANNEL: {e}")
4001 })?);
4002 }
4003 if let Ok(max_bytes) = std::env::var("PRESENCE_HISTORY_MAX_BYTES_PER_CHANNEL") {
4004 self.presence_history.max_bytes_per_channel = Some(
4005 max_bytes
4006 .parse::<u64>()
4007 .map_err(|e| format!("Invalid PRESENCE_HISTORY_MAX_BYTES_PER_CHANNEL: {e}"))?,
4008 );
4009 }
4010 self.idempotency.enabled = parse_bool_env("IDEMPOTENCY_ENABLED", self.idempotency.enabled);
4011 self.idempotency.ttl_seconds =
4012 parse_env::<u64>("IDEMPOTENCY_TTL_SECONDS", self.idempotency.ttl_seconds);
4013 self.idempotency.max_key_length = parse_env::<usize>(
4014 "IDEMPOTENCY_MAX_KEY_LENGTH",
4015 self.idempotency.max_key_length,
4016 );
4017
4018 self.ephemeral.enabled = parse_bool_env("EPHEMERAL_ENABLED", self.ephemeral.enabled);
4019
4020 self.echo_control.enabled =
4021 parse_bool_env("ECHO_CONTROL_ENABLED", self.echo_control.enabled);
4022 self.echo_control.default_echo_messages = parse_bool_env(
4023 "ECHO_CONTROL_DEFAULT_ECHO_MESSAGES",
4024 self.echo_control.default_echo_messages,
4025 );
4026
4027 self.event_name_filtering.enabled = parse_bool_env(
4028 "EVENT_NAME_FILTERING_ENABLED",
4029 self.event_name_filtering.enabled,
4030 );
4031 self.event_name_filtering.max_events_per_filter = parse_env::<usize>(
4032 "EVENT_NAME_FILTERING_MAX_EVENTS_PER_FILTER",
4033 self.event_name_filtering.max_events_per_filter,
4034 );
4035 self.event_name_filtering.max_event_name_length = parse_env::<usize>(
4036 "EVENT_NAME_FILTERING_MAX_EVENT_NAME_LENGTH",
4037 self.event_name_filtering.max_event_name_length,
4038 );
4039 self.versioned_messages.enabled = parse_bool_env(
4040 "VERSIONED_MESSAGES_ENABLED",
4041 self.versioned_messages.enabled,
4042 );
4043 if let Ok(driver_str) = std::env::var("VERSIONED_MESSAGES_DRIVER") {
4044 self.versioned_messages.driver = parse_driver_enum(
4045 driver_str,
4046 self.versioned_messages.driver.clone(),
4047 "VersionedMessages Backend",
4048 );
4049 }
4050 if let Ok(driver_str) = std::env::var("PUSH_STORAGE_DRIVER") {
4051 self.push.storage_driver = parse_driver_enum(
4052 driver_str,
4053 self.push.storage_driver.clone(),
4054 "Push Storage Backend",
4055 );
4056 }
4057 if let Ok(driver_str) = std::env::var("PUSH_QUEUE_DRIVER") {
4058 self.push.queue_driver = parse_driver_enum(
4059 driver_str,
4060 self.push.queue_driver.clone(),
4061 "Push Queue Backend",
4062 );
4063 }
4064 self.push.fcm_enabled = parse_bool_env("PUSH_FCM_ENABLED", self.push.fcm_enabled);
4065 self.push.apns_enabled = parse_bool_env("PUSH_APNS_ENABLED", self.push.apns_enabled);
4066 self.push.webpush_enabled =
4067 parse_bool_env("PUSH_WEBPUSH_ENABLED", self.push.webpush_enabled);
4068 self.push.hms_enabled = parse_bool_env("PUSH_HMS_ENABLED", self.push.hms_enabled);
4069 self.push.wns_enabled = parse_bool_env("PUSH_WNS_ENABLED", self.push.wns_enabled);
4070 if let Some(key) = parse_env_optional::<String>("PUSH_CREDENTIAL_ENCRYPTION_KEY") {
4071 self.push.credential_encryption_key = Some(key);
4072 }
4073 self.push.fanout_fast_threshold = parse_env::<u64>(
4074 "PUSH_FANOUT_FAST_THRESHOLD",
4075 self.push.fanout_fast_threshold,
4076 );
4077 self.push.fanout_shard_size =
4078 parse_env::<u64>("PUSH_FANOUT_SHARD_SIZE", self.push.fanout_shard_size);
4079 self.push.fanout_sync_threshold = parse_env::<u64>(
4080 "PUSH_FANOUT_SYNC_THRESHOLD",
4081 self.push.fanout_sync_threshold,
4082 );
4083 self.push.backpressure_lag_threshold_secs = parse_env::<u64>(
4084 "PUSH_BACKPRESSURE_LAG_THRESHOLD_SECS",
4085 self.push.backpressure_lag_threshold_secs,
4086 );
4087 self.push.publish_status_ttl_days = parse_env::<u64>(
4088 "PUSH_PUBLISH_STATUS_TTL_DAYS",
4089 self.push.publish_status_ttl_days,
4090 );
4091 self.push.circuit_breaker.failure_threshold = parse_env::<u32>(
4092 "PUSH_FAILURE_THRESHOLD",
4093 self.push.circuit_breaker.failure_threshold,
4094 );
4095 self.push.scheduler_interval_secs = parse_env::<u64>(
4096 "PUSH_SCHEDULER_INTERVAL_SECS",
4097 self.push.scheduler_interval_secs,
4098 );
4099 self.push.stale_device_max_age_days = parse_env::<u64>(
4100 "PUSH_STALE_DEVICE_MAX_AGE_DAYS",
4101 self.push.stale_device_max_age_days,
4102 );
4103 self.push.analytics_enabled =
4104 parse_bool_env("PUSH_ANALYTICS_ENABLED", self.push.analytics_enabled);
4105 self.push.default_quotas.acceptance_rps = parse_env::<u64>(
4106 "PUSH_DEFAULT_ACCEPTANCE_RPS",
4107 self.push.default_quotas.acceptance_rps,
4108 );
4109 self.push.default_quotas.delivery_quota_daily = parse_env::<u64>(
4110 "PUSH_DEFAULT_DELIVERY_QUOTA_DAILY",
4111 self.push.default_quotas.delivery_quota_daily,
4112 );
4113 self.push.default_quotas.fanout_max = parse_env::<u64>(
4114 "PUSH_DEFAULT_FANOUT_MAX",
4115 self.push.default_quotas.fanout_max,
4116 );
4117 self.push.default_quotas.inflight_max = parse_env::<u64>(
4118 "PUSH_DEFAULT_INFLIGHT_MAX",
4119 self.push.default_quotas.inflight_max,
4120 );
4121 self.versioned_messages.max_page_size = parse_env::<usize>(
4122 "VERSIONED_MESSAGES_MAX_PAGE_SIZE",
4123 self.versioned_messages.max_page_size,
4124 );
4125 self.versioned_messages.retention_window_seconds = parse_env::<u64>(
4126 "VERSIONED_MESSAGES_RETENTION_WINDOW_SECONDS",
4127 self.versioned_messages.retention_window_seconds,
4128 );
4129 self.versioned_messages.purge_interval_seconds = parse_env::<u64>(
4130 "VERSIONED_MESSAGES_PURGE_INTERVAL_SECONDS",
4131 self.versioned_messages.purge_interval_seconds,
4132 );
4133 self.versioned_messages.purge_batch_size = parse_env::<usize>(
4134 "VERSIONED_MESSAGES_PURGE_BATCH_SIZE",
4135 self.versioned_messages.purge_batch_size,
4136 );
4137 self.versioned_messages.max_purge_per_tick = parse_env::<usize>(
4138 "VERSIONED_MESSAGES_MAX_PURGE_PER_TICK",
4139 self.versioned_messages.max_purge_per_tick,
4140 );
4141 self.annotations.enabled = parse_bool_env("ANNOTATIONS_ENABLED", self.annotations.enabled);
4142
4143 Ok(())
4144 }
4145
4146 pub fn validate(&self) -> Result<(), String> {
4147 if self.unix_socket.enabled {
4148 if self.unix_socket.path.is_empty() {
4149 return Err(
4150 "Unix socket path cannot be empty when Unix socket is enabled".to_string(),
4151 );
4152 }
4153
4154 self.validate_unix_socket_security()?;
4155
4156 if self.ssl.enabled {
4157 tracing::warn!(
4158 "Both Unix socket and SSL are enabled. This is unusual as Unix sockets are typically used behind reverse proxies that handle SSL termination."
4159 );
4160 }
4161
4162 if self.unix_socket.permission_mode > 0o777 {
4163 return Err(format!(
4164 "Unix socket permission_mode ({:o}) is invalid. Must be a valid octal mode (0o000 to 0o777)",
4165 self.unix_socket.permission_mode
4166 ));
4167 }
4168 }
4169
4170 if let Err(e) = self.cleanup.validate() {
4171 return Err(format!("Invalid cleanup configuration: {}", e));
4172 }
4173
4174 if self.history.enabled {
4175 if self.history.max_page_size == 0 {
4176 return Err("history.max_page_size must be greater than 0".to_string());
4177 }
4178 if self.history.writer_shards == 0 {
4179 return Err("history.writer_shards must be greater than 0".to_string());
4180 }
4181 if self.history.writer_queue_capacity == 0 {
4182 return Err("history.writer_queue_capacity must be greater than 0".to_string());
4183 }
4184 if self.history.retention_window_seconds == 0 {
4185 return Err("history.retention_window_seconds must be greater than 0".to_string());
4186 }
4187 if self.history.postgres.table_prefix.trim().is_empty() {
4188 return Err("history.postgres.table_prefix must not be empty".to_string());
4189 }
4190 }
4191
4192 if self.presence_history.enabled {
4193 if self.presence_history.max_page_size == 0 {
4194 return Err("presence_history.max_page_size must be greater than 0".to_string());
4195 }
4196 if self.presence_history.retention_window_seconds == 0 {
4197 return Err(
4198 "presence_history.retention_window_seconds must be greater than 0".to_string(),
4199 );
4200 }
4201 }
4202
4203 if self.versioned_messages.enabled && self.versioned_messages.max_page_size == 0 {
4204 return Err("versioned_messages.max_page_size must be greater than 0".to_string());
4205 }
4206 if self.annotations.enabled && !self.versioned_messages.enabled {
4207 return Err("annotations require versioned_messages.enabled".to_string());
4208 }
4209
4210 if self.adapter.nats.presence_sync_chunk_size == Some(0) {
4211 return Err("nats.presence_sync_chunk_size must be > 0 when set".to_string());
4212 }
4213
4214 Ok(())
4215 }
4216
4217 fn validate_unix_socket_security(&self) -> Result<(), String> {
4218 let path = &self.unix_socket.path;
4219
4220 if path.contains("../") || path.contains("..\\") {
4221 return Err(
4222 "Unix socket path contains directory traversal sequences (../). This is not allowed for security reasons.".to_string()
4223 );
4224 }
4225
4226 if self.unix_socket.permission_mode & 0o002 != 0 {
4227 warn!(
4228 "Unix socket permission mode ({:o}) allows world write access. This may be a security risk. Consider using more restrictive permissions like 0o660 or 0o750.",
4229 self.unix_socket.permission_mode
4230 );
4231 }
4232
4233 if self.unix_socket.permission_mode & 0o007 > 0o005 {
4234 warn!(
4235 "Unix socket permission mode ({:o}) grants write permissions to others. Consider using more restrictive permissions.",
4236 self.unix_socket.permission_mode
4237 );
4238 }
4239
4240 if self.mode == "production" && path.starts_with("/tmp/") {
4241 warn!(
4242 "Unix socket path '{}' is in /tmp directory. In production, consider using a more permanent location like /var/run/sockudo/ for better security and persistence.",
4243 path
4244 );
4245 }
4246
4247 if !path.starts_with('/') {
4248 return Err(
4249 "Unix socket path must be absolute (start with /) for security and reliability."
4250 .to_string(),
4251 );
4252 }
4253
4254 Ok(())
4255 }
4256}
4257
4258#[cfg(test)]
4259mod redis_connection_tests {
4260 use super::{ClusterNode, RedisClusterConnection, RedisConnection, RedisSentinel};
4261
4262 #[test]
4263 fn test_standard_url_basic() {
4264 let conn = RedisConnection {
4265 host: "127.0.0.1".to_string(),
4266 port: 6379,
4267 db: 0,
4268 username: None,
4269 password: None,
4270 key_prefix: "sockudo:".to_string(),
4271 sentinels: Vec::new(),
4272 sentinel_password: None,
4273 name: "mymaster".to_string(),
4274 cluster: RedisClusterConnection::default(),
4275 cluster_nodes: Vec::new(),
4276 };
4277 assert_eq!(conn.to_url(), "redis://127.0.0.1:6379/0");
4278 }
4279
4280 #[test]
4281 fn test_standard_url_with_password() {
4282 let conn = RedisConnection {
4283 host: "127.0.0.1".to_string(),
4284 port: 6379,
4285 db: 2,
4286 username: None,
4287 password: Some("secret".to_string()),
4288 key_prefix: "sockudo:".to_string(),
4289 sentinels: Vec::new(),
4290 sentinel_password: None,
4291 name: "mymaster".to_string(),
4292 cluster: RedisClusterConnection::default(),
4293 cluster_nodes: Vec::new(),
4294 };
4295 assert_eq!(conn.to_url(), "redis://:secret@127.0.0.1:6379/2");
4296 }
4297
4298 #[test]
4299 fn test_standard_url_with_username_and_password() {
4300 let conn = RedisConnection {
4301 host: "redis.example.com".to_string(),
4302 port: 6380,
4303 db: 1,
4304 username: Some("admin".to_string()),
4305 password: Some("pass123".to_string()),
4306 key_prefix: "sockudo:".to_string(),
4307 sentinels: Vec::new(),
4308 sentinel_password: None,
4309 name: "mymaster".to_string(),
4310 cluster: RedisClusterConnection::default(),
4311 cluster_nodes: Vec::new(),
4312 };
4313 assert_eq!(
4314 conn.to_url(),
4315 "redis://admin:pass123@redis.example.com:6380/1"
4316 );
4317 }
4318
4319 #[test]
4320 fn test_standard_url_with_special_chars_in_password() {
4321 let conn = RedisConnection {
4322 host: "127.0.0.1".to_string(),
4323 port: 6379,
4324 db: 0,
4325 username: None,
4326 password: Some("pass@word#123".to_string()),
4327 key_prefix: "sockudo:".to_string(),
4328 sentinels: Vec::new(),
4329 sentinel_password: None,
4330 name: "mymaster".to_string(),
4331 cluster: RedisClusterConnection::default(),
4332 cluster_nodes: Vec::new(),
4333 };
4334 assert_eq!(conn.to_url(), "redis://:pass%40word%23123@127.0.0.1:6379/0");
4335 }
4336
4337 #[test]
4338 fn test_is_sentinel_configured_false() {
4339 let conn = RedisConnection::default();
4340 assert!(!conn.is_sentinel_configured());
4341 }
4342
4343 #[test]
4344 fn test_is_sentinel_configured_true() {
4345 let conn = RedisConnection {
4346 sentinels: vec![RedisSentinel {
4347 host: "sentinel1".to_string(),
4348 port: 26379,
4349 }],
4350 ..Default::default()
4351 };
4352 assert!(conn.is_sentinel_configured());
4353 }
4354
4355 #[test]
4356 fn test_sentinel_url_basic() {
4357 let conn = RedisConnection {
4358 host: "127.0.0.1".to_string(),
4359 port: 6379,
4360 db: 0,
4361 username: None,
4362 password: None,
4363 key_prefix: "sockudo:".to_string(),
4364 sentinels: vec![
4365 RedisSentinel {
4366 host: "sentinel1".to_string(),
4367 port: 26379,
4368 },
4369 RedisSentinel {
4370 host: "sentinel2".to_string(),
4371 port: 26379,
4372 },
4373 ],
4374 sentinel_password: None,
4375 name: "mymaster".to_string(),
4376 cluster: RedisClusterConnection::default(),
4377 cluster_nodes: Vec::new(),
4378 };
4379 assert_eq!(
4380 conn.to_url(),
4381 "redis+sentinel://sentinel1:26379,sentinel2:26379/mymaster/0"
4382 );
4383 }
4384
4385 #[test]
4386 fn test_sentinel_url_with_sentinel_password() {
4387 let conn = RedisConnection {
4388 host: "127.0.0.1".to_string(),
4389 port: 6379,
4390 db: 0,
4391 username: None,
4392 password: None,
4393 key_prefix: "sockudo:".to_string(),
4394 sentinels: vec![RedisSentinel {
4395 host: "sentinel1".to_string(),
4396 port: 26379,
4397 }],
4398 sentinel_password: Some("sentinelpass".to_string()),
4399 name: "mymaster".to_string(),
4400 cluster: RedisClusterConnection::default(),
4401 cluster_nodes: Vec::new(),
4402 };
4403 assert_eq!(
4404 conn.to_url(),
4405 "redis+sentinel://:sentinelpass@sentinel1:26379/mymaster/0"
4406 );
4407 }
4408
4409 #[test]
4410 fn test_sentinel_url_with_master_password() {
4411 let conn = RedisConnection {
4412 host: "127.0.0.1".to_string(),
4413 port: 6379,
4414 db: 1,
4415 username: None,
4416 password: Some("masterpass".to_string()),
4417 key_prefix: "sockudo:".to_string(),
4418 sentinels: vec![RedisSentinel {
4419 host: "sentinel1".to_string(),
4420 port: 26379,
4421 }],
4422 sentinel_password: None,
4423 name: "mymaster".to_string(),
4424 cluster: RedisClusterConnection::default(),
4425 cluster_nodes: Vec::new(),
4426 };
4427 assert_eq!(
4428 conn.to_url(),
4429 "redis+sentinel://sentinel1:26379/mymaster/1?password=masterpass"
4430 );
4431 }
4432
4433 #[test]
4434 fn test_sentinel_url_with_all_auth() {
4435 let conn = RedisConnection {
4436 host: "127.0.0.1".to_string(),
4437 port: 6379,
4438 db: 2,
4439 username: Some("redisuser".to_string()),
4440 password: Some("redispass".to_string()),
4441 key_prefix: "sockudo:".to_string(),
4442 sentinels: vec![
4443 RedisSentinel {
4444 host: "sentinel1".to_string(),
4445 port: 26379,
4446 },
4447 RedisSentinel {
4448 host: "sentinel2".to_string(),
4449 port: 26380,
4450 },
4451 ],
4452 sentinel_password: Some("sentinelauth".to_string()),
4453 name: "production-master".to_string(),
4454 cluster: RedisClusterConnection::default(),
4455 cluster_nodes: Vec::new(),
4456 };
4457 assert_eq!(
4458 conn.to_url(),
4459 "redis+sentinel://:sentinelauth@sentinel1:26379,sentinel2:26380/production-master/2?password=redispass&username=redisuser"
4460 );
4461 }
4462
4463 #[test]
4464 fn test_sentinel_to_host_port() {
4465 let sentinel = RedisSentinel {
4466 host: "sentinel.example.com".to_string(),
4467 port: 26379,
4468 };
4469 assert_eq!(sentinel.to_host_port(), "sentinel.example.com:26379");
4470 }
4471
4472 #[test]
4473 fn test_cluster_node_urls_with_shared_cluster_auth_and_tls() {
4474 let conn = RedisConnection {
4475 cluster: RedisClusterConnection {
4476 nodes: vec![
4477 ClusterNode {
4478 host: "node1.secure-cluster.com".to_string(),
4479 port: 7000,
4480 },
4481 ClusterNode {
4482 host: "redis://node2.secure-cluster.com:7001".to_string(),
4483 port: 7001,
4484 },
4485 ClusterNode {
4486 host: "rediss://node3.secure-cluster.com".to_string(),
4487 port: 7002,
4488 },
4489 ],
4490 username: None,
4491 password: Some("cluster-secret".to_string()),
4492 use_tls: true,
4493 },
4494 ..Default::default()
4495 };
4496
4497 assert_eq!(
4498 conn.cluster_node_urls(),
4499 vec![
4500 "rediss://:cluster-secret@node1.secure-cluster.com:7000",
4501 "redis://:cluster-secret@node2.secure-cluster.com:7001",
4502 "rediss://:cluster-secret@node3.secure-cluster.com:7002",
4503 ]
4504 );
4505 }
4506
4507 #[test]
4508 fn test_cluster_node_urls_fallback_to_legacy_nodes() {
4509 let conn = RedisConnection {
4510 password: Some("fallback-secret".to_string()),
4511 cluster_nodes: vec![ClusterNode {
4512 host: "legacy-node.example.com".to_string(),
4513 port: 7000,
4514 }],
4515 ..Default::default()
4516 };
4517
4518 assert_eq!(
4519 conn.cluster_node_urls(),
4520 vec!["redis://:fallback-secret@legacy-node.example.com:7000"]
4521 );
4522 }
4523
4524 #[test]
4525 fn test_normalize_cluster_seed_urls() {
4526 let conn = RedisConnection {
4527 cluster: RedisClusterConnection {
4528 nodes: Vec::new(),
4529 username: Some("svc-user".to_string()),
4530 password: Some("svc-pass".to_string()),
4531 use_tls: true,
4532 },
4533 ..Default::default()
4534 };
4535
4536 let seeds = vec![
4537 "node1.example.com:7000".to_string(),
4538 "redis://node2.example.com:7001".to_string(),
4539 "rediss://node3.example.com".to_string(),
4540 ];
4541
4542 assert_eq!(
4543 conn.normalize_cluster_seed_urls(&seeds),
4544 vec![
4545 "rediss://svc-user:svc-pass@node1.example.com:7000",
4546 "redis://svc-user:svc-pass@node2.example.com:7001",
4547 "rediss://svc-user:svc-pass@node3.example.com:6379",
4548 ]
4549 );
4550 }
4551}
4552
4553#[cfg(test)]
4554mod cluster_node_tests {
4555 use super::ClusterNode;
4556
4557 #[test]
4558 fn test_to_url_basic_host() {
4559 let node = ClusterNode {
4560 host: "localhost".to_string(),
4561 port: 6379,
4562 };
4563 assert_eq!(node.to_url(), "redis://localhost:6379");
4564 }
4565
4566 #[test]
4567 fn test_to_url_ip_address() {
4568 let node = ClusterNode {
4569 host: "127.0.0.1".to_string(),
4570 port: 6379,
4571 };
4572 assert_eq!(node.to_url(), "redis://127.0.0.1:6379");
4573 }
4574
4575 #[test]
4576 fn test_to_url_with_redis_protocol() {
4577 let node = ClusterNode {
4578 host: "redis://example.com".to_string(),
4579 port: 6379,
4580 };
4581 assert_eq!(node.to_url(), "redis://example.com:6379");
4582 }
4583
4584 #[test]
4585 fn test_to_url_with_rediss_protocol() {
4586 let node = ClusterNode {
4587 host: "rediss://secure.example.com".to_string(),
4588 port: 6379,
4589 };
4590 assert_eq!(node.to_url(), "rediss://secure.example.com:6379");
4591 }
4592
4593 #[test]
4594 fn test_to_url_with_rediss_protocol_and_port_in_url() {
4595 let node = ClusterNode {
4596 host: "rediss://secure.example.com:7000".to_string(),
4597 port: 6379,
4598 };
4599 assert_eq!(node.to_url(), "rediss://secure.example.com:7000");
4600 }
4601
4602 #[test]
4603 fn test_to_url_with_redis_protocol_and_port_in_url() {
4604 let node = ClusterNode {
4605 host: "redis://example.com:7001".to_string(),
4606 port: 6379,
4607 };
4608 assert_eq!(node.to_url(), "redis://example.com:7001");
4609 }
4610
4611 #[test]
4612 fn test_to_url_with_trailing_whitespace() {
4613 let node = ClusterNode {
4614 host: " rediss://secure.example.com ".to_string(),
4615 port: 6379,
4616 };
4617 assert_eq!(node.to_url(), "rediss://secure.example.com:6379");
4618 }
4619
4620 #[test]
4621 fn test_to_url_custom_port() {
4622 let node = ClusterNode {
4623 host: "redis-cluster.example.com".to_string(),
4624 port: 7000,
4625 };
4626 assert_eq!(node.to_url(), "redis://redis-cluster.example.com:7000");
4627 }
4628
4629 #[test]
4630 fn test_to_url_plain_host_with_port_in_host_field() {
4631 let node = ClusterNode {
4632 host: "redis-cluster.example.com:7010".to_string(),
4633 port: 7000,
4634 };
4635 assert_eq!(node.to_url(), "redis://redis-cluster.example.com:7010");
4636 }
4637
4638 #[test]
4639 fn test_to_url_with_options_adds_auth_and_tls() {
4640 let node = ClusterNode {
4641 host: "node.example.com".to_string(),
4642 port: 7000,
4643 };
4644 assert_eq!(
4645 node.to_url_with_options(true, Some("svc-user"), Some("secret")),
4646 "rediss://svc-user:secret@node.example.com:7000"
4647 );
4648 }
4649
4650 #[test]
4651 fn test_to_url_with_options_keeps_embedded_auth() {
4652 let node = ClusterNode {
4653 host: "rediss://:node-secret@node.example.com:7000".to_string(),
4654 port: 7000,
4655 };
4656 assert_eq!(
4657 node.to_url_with_options(true, Some("global-user"), Some("global-secret")),
4658 "rediss://:node-secret@node.example.com:7000"
4659 );
4660 }
4661
4662 #[test]
4663 fn test_from_seed_parses_plain_host_port() {
4664 let node = ClusterNode::from_seed("cluster-node-1:7005").expect("node should parse");
4665 assert_eq!(node.host, "cluster-node-1");
4666 assert_eq!(node.port, 7005);
4667 }
4668
4669 #[test]
4670 fn test_from_seed_keeps_scheme_urls() {
4671 let node =
4672 ClusterNode::from_seed("rediss://secure.example.com:7005").expect("node should parse");
4673 assert_eq!(node.host, "rediss://secure.example.com:7005");
4674 assert_eq!(node.port, 7005);
4675 }
4676
4677 #[test]
4678 fn test_to_url_aws_elasticache_hostname() {
4679 let node = ClusterNode {
4680 host: "rediss://my-cluster.use1.cache.amazonaws.com".to_string(),
4681 port: 6379,
4682 };
4683 assert_eq!(
4684 node.to_url(),
4685 "rediss://my-cluster.use1.cache.amazonaws.com:6379"
4686 );
4687 }
4688
4689 #[test]
4690 fn test_to_url_with_ipv6_no_port() {
4691 let node = ClusterNode {
4692 host: "rediss://[::1]".to_string(),
4693 port: 6379,
4694 };
4695 assert_eq!(node.to_url(), "rediss://[::1]:6379");
4696 }
4697
4698 #[test]
4699 fn test_to_url_with_ipv6_and_port_in_url() {
4700 let node = ClusterNode {
4701 host: "rediss://[::1]:7000".to_string(),
4702 port: 6379,
4703 };
4704 assert_eq!(node.to_url(), "rediss://[::1]:7000");
4705 }
4706
4707 #[test]
4708 fn test_to_url_with_ipv6_full_address_no_port() {
4709 let node = ClusterNode {
4710 host: "rediss://[2001:db8::1]".to_string(),
4711 port: 6379,
4712 };
4713 assert_eq!(node.to_url(), "rediss://[2001:db8::1]:6379");
4714 }
4715
4716 #[test]
4717 fn test_to_url_with_ipv6_full_address_with_port() {
4718 let node = ClusterNode {
4719 host: "rediss://[2001:db8::1]:7000".to_string(),
4720 port: 6379,
4721 };
4722 assert_eq!(node.to_url(), "rediss://[2001:db8::1]:7000");
4723 }
4724
4725 #[test]
4726 fn test_to_url_with_redis_protocol_ipv6() {
4727 let node = ClusterNode {
4728 host: "redis://[::1]".to_string(),
4729 port: 6379,
4730 };
4731 assert_eq!(node.to_url(), "redis://[::1]:6379");
4732 }
4733}
4734
4735#[cfg(test)]
4736mod cors_config_tests {
4737 use super::CorsConfig;
4738
4739 fn cors_from_json(json: &str) -> sonic_rs::Result<CorsConfig> {
4740 sonic_rs::from_str(json)
4741 }
4742
4743 #[test]
4744 fn test_deserialize_valid_exact_origins() {
4745 let config =
4746 cors_from_json(r#"{"origin": ["https://example.com", "https://other.com"]}"#).unwrap();
4747 assert_eq!(config.origin.len(), 2);
4748 }
4749
4750 #[test]
4751 fn test_deserialize_valid_wildcard_patterns() {
4752 let config =
4753 cors_from_json(r#"{"origin": ["*.example.com", "https://*.staging.example.com"]}"#)
4754 .unwrap();
4755 assert_eq!(config.origin.len(), 2);
4756 }
4757
4758 #[test]
4759 fn test_deserialize_allows_special_markers() {
4760 assert!(cors_from_json(r#"{"origin": ["*"]}"#).is_ok());
4761 assert!(cors_from_json(r#"{"origin": ["Any"]}"#).is_ok());
4762 assert!(cors_from_json(r#"{"origin": ["any"]}"#).is_ok());
4763 assert!(cors_from_json(r#"{"origin": ["*", "https://example.com"]}"#).is_ok());
4764 }
4765
4766 #[test]
4767 fn test_deserialize_rejects_invalid_patterns() {
4768 assert!(cors_from_json(r#"{"origin": ["*.*bad"]}"#).is_err());
4769 assert!(cors_from_json(r#"{"origin": ["*."]}"#).is_err());
4770 assert!(cors_from_json(r#"{"origin": [""]}"#).is_err());
4771 assert!(cors_from_json(r#"{"origin": ["https://"]}"#).is_err());
4772 }
4773
4774 #[test]
4775 fn test_deserialize_rejects_mixed_valid_and_invalid() {
4776 assert!(cors_from_json(r#"{"origin": ["https://good.com", "*.*bad"]}"#).is_err());
4777 }
4778}