1use crate::app::App;
2use crate::utils::{parse_bool_env, parse_env, parse_env_optional};
3use ahash::AHashMap;
4use serde::{Deserialize, Serialize};
5use std::str::FromStr;
6use tracing::{info, warn};
7use url::Url;
8
9fn deserialize_octal_permission<'de, D>(deserializer: D) -> Result<u32, D::Error>
11where
12 D: serde::Deserializer<'de>,
13{
14 use serde::de::{self};
15
16 let s = String::deserialize(deserializer)?;
17
18 if !s.chars().all(|c| c.is_digit(8)) {
20 return Err(de::Error::custom(format!(
21 "invalid octal permission mode '{}': must contain only digits 0-7",
22 s
23 )));
24 }
25
26 let mode = u32::from_str_radix(&s, 8)
28 .map_err(|_| de::Error::custom(format!("invalid octal permission mode: {}", s)))?;
29
30 if mode > 0o777 {
32 return Err(de::Error::custom(format!(
33 "permission mode '{}' exceeds maximum value 777",
34 s
35 )));
36 }
37
38 Ok(mode)
39}
40
41fn parse_driver_enum<T: FromStr + Clone + std::fmt::Debug>(
43 driver_str: String,
44 default_driver: T,
45 driver_name: &str,
46) -> T
47where
48 <T as FromStr>::Err: std::fmt::Debug,
49{
50 match T::from_str(&driver_str.to_lowercase()) {
51 Ok(driver_enum) => driver_enum,
52 Err(e) => {
53 warn!(
54 "Failed to parse {} driver from string '{}': {:?}. Using default: {:?}.",
55 driver_name, driver_str, e, default_driver
56 );
57 default_driver
58 }
59 }
60}
61
62fn override_db_pool_settings(db_conn: &mut DatabaseConnection, prefix: &str) {
63 if let Some(min) = parse_env_optional::<u32>(&format!("{}_POOL_MIN", prefix)) {
64 db_conn.pool_min = Some(min);
65 }
66 if let Some(max) = parse_env_optional::<u32>(&format!("{}_POOL_MAX", prefix)) {
67 db_conn.pool_max = Some(max);
68 }
69}
70
71#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
74#[serde(rename_all = "lowercase")]
75pub enum AdapterDriver {
76 #[default]
77 Local,
78 Redis,
79 #[serde(rename = "redis-cluster")]
80 RedisCluster,
81 Nats,
82 Pulsar,
83 RabbitMq,
84 #[serde(rename = "google-pubsub")]
85 GooglePubSub,
86 Kafka,
87 Iggy,
88}
89
90#[derive(Debug, Clone, Serialize, Deserialize)]
91#[serde(default)]
92pub struct DynamoDbSettings {
93 pub region: String,
94 pub table_name: String,
95 pub endpoint_url: Option<String>,
96 pub aws_access_key_id: Option<String>,
97 pub aws_secret_access_key: Option<String>,
98 pub aws_profile_name: Option<String>,
99}
100
101impl Default for DynamoDbSettings {
102 fn default() -> Self {
103 Self {
104 region: "us-east-1".to_string(),
105 table_name: "sockudo-applications".to_string(),
106 endpoint_url: None,
107 aws_access_key_id: None,
108 aws_secret_access_key: None,
109 aws_profile_name: None,
110 }
111 }
112}
113
114#[derive(Debug, Clone, Serialize, Deserialize)]
115#[serde(default)]
116pub struct ScyllaDbSettings {
117 pub nodes: Vec<String>,
118 pub keyspace: String,
119 pub table_name: String,
120 pub username: Option<String>,
121 pub password: Option<String>,
122 pub replication_class: String,
123 pub replication_factor: u32,
124}
125
126impl Default for ScyllaDbSettings {
127 fn default() -> Self {
128 Self {
129 nodes: vec!["127.0.0.1:9042".to_string()],
130 keyspace: "sockudo".to_string(),
131 table_name: "applications".to_string(),
132 username: None,
133 password: None,
134 replication_class: "SimpleStrategy".to_string(),
135 replication_factor: 3,
136 }
137 }
138}
139
140#[derive(Debug, Clone, Serialize, Deserialize)]
141#[serde(default)]
142pub struct SurrealDbSettings {
143 pub url: String,
144 pub namespace: String,
145 pub database: String,
146 pub username: String,
147 pub password: String,
148 pub table_name: String,
149 pub cache_ttl: u64,
150 pub cache_max_capacity: u64,
151}
152
153impl Default for SurrealDbSettings {
154 fn default() -> Self {
155 Self {
156 url: "ws://127.0.0.1:8000".to_string(),
157 namespace: "sockudo".to_string(),
158 database: "sockudo".to_string(),
159 username: "root".to_string(),
160 password: "root".to_string(),
161 table_name: "applications".to_string(),
162 cache_ttl: 300,
163 cache_max_capacity: 100,
164 }
165 }
166}
167
168impl FromStr for AdapterDriver {
169 type Err = String;
170 fn from_str(s: &str) -> Result<Self, Self::Err> {
171 match s.to_lowercase().as_str() {
172 "local" => Ok(AdapterDriver::Local),
173 "redis" => Ok(AdapterDriver::Redis),
174 "redis-cluster" => Ok(AdapterDriver::RedisCluster),
175 "nats" => Ok(AdapterDriver::Nats),
176 "pulsar" => Ok(AdapterDriver::Pulsar),
177 "rabbitmq" | "rabbit-mq" => Ok(AdapterDriver::RabbitMq),
178 "google-pubsub" | "gcp-pubsub" | "pubsub" => Ok(AdapterDriver::GooglePubSub),
179 "kafka" => Ok(AdapterDriver::Kafka),
180 "iggy" | "apache-iggy" | "apache_iggy" => Ok(AdapterDriver::Iggy),
181 _ => Err(format!("Unknown adapter driver: {s}")),
182 }
183 }
184}
185
186#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
187#[serde(rename_all = "lowercase")]
188pub enum AppManagerDriver {
189 #[default]
190 Memory,
191 Mysql,
192 Dynamodb,
193 PgSql,
194 SurrealDb,
195 ScyllaDb,
196}
197impl FromStr for AppManagerDriver {
198 type Err = String;
199 fn from_str(s: &str) -> Result<Self, Self::Err> {
200 match s.to_lowercase().as_str() {
201 "memory" => Ok(AppManagerDriver::Memory),
202 "mysql" => Ok(AppManagerDriver::Mysql),
203 "dynamodb" => Ok(AppManagerDriver::Dynamodb),
204 "pgsql" | "postgres" | "postgresql" => Ok(AppManagerDriver::PgSql),
205 "surreal" | "surrealdb" => Ok(AppManagerDriver::SurrealDb),
206 "scylladb" | "scylla" => Ok(AppManagerDriver::ScyllaDb),
207 _ => Err(format!("Unknown app manager driver: {s}")),
208 }
209 }
210}
211
212#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
213#[serde(rename_all = "lowercase")]
214pub enum CacheDriver {
215 #[default]
216 Memory,
217 Redis,
218 #[serde(rename = "redis-cluster")]
219 RedisCluster,
220 None,
221}
222
223impl FromStr for CacheDriver {
224 type Err = String;
225 fn from_str(s: &str) -> Result<Self, Self::Err> {
226 match s.to_lowercase().as_str() {
227 "memory" => Ok(CacheDriver::Memory),
228 "redis" => Ok(CacheDriver::Redis),
229 "redis-cluster" => Ok(CacheDriver::RedisCluster),
230 "none" => Ok(CacheDriver::None),
231 _ => Err(format!("Unknown cache driver: {s}")),
232 }
233 }
234}
235
236#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
237#[serde(rename_all = "lowercase")]
238pub enum QueueDriver {
239 Memory,
240 #[default]
241 Redis,
242 #[serde(rename = "redis-cluster")]
243 RedisCluster,
244 Nats,
245 Pulsar,
246 RabbitMq,
247 #[serde(rename = "google-pubsub")]
248 GooglePubSub,
249 Kafka,
250 Iggy,
251 Sqs,
252 Sns,
253 None,
254}
255
256impl FromStr for QueueDriver {
257 type Err = String;
258 fn from_str(s: &str) -> Result<Self, Self::Err> {
259 match s.to_lowercase().as_str() {
260 "memory" => Ok(QueueDriver::Memory),
261 "redis" => Ok(QueueDriver::Redis),
262 "redis-cluster" => Ok(QueueDriver::RedisCluster),
263 "nats" => Ok(QueueDriver::Nats),
264 "pulsar" => Ok(QueueDriver::Pulsar),
265 "rabbitmq" | "rabbit-mq" => Ok(QueueDriver::RabbitMq),
266 "google-pubsub" | "gcp-pubsub" | "pubsub" => Ok(QueueDriver::GooglePubSub),
267 "kafka" => Ok(QueueDriver::Kafka),
268 "iggy" | "apache-iggy" | "apache_iggy" => Ok(QueueDriver::Iggy),
269 "sqs" => Ok(QueueDriver::Sqs),
270 "sns" => Ok(QueueDriver::Sns),
271 "none" => Ok(QueueDriver::None),
272 _ => Err(format!("Unknown queue driver: {s}")),
273 }
274 }
275}
276
277#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
278#[serde(rename_all = "snake_case")]
279pub enum DeltaCoordinationBackend {
280 #[default]
281 Auto,
282 None,
283 Redis,
284 RedisCluster,
285 Nats,
286}
287
288impl FromStr for DeltaCoordinationBackend {
289 type Err = String;
290
291 fn from_str(s: &str) -> Result<Self, Self::Err> {
292 match s.trim().to_ascii_lowercase().as_str() {
293 "auto" => Ok(Self::Auto),
294 "none" => Ok(Self::None),
295 "redis" => Ok(Self::Redis),
296 "redis-cluster" | "redis_cluster" => Ok(Self::RedisCluster),
297 "nats" => Ok(Self::Nats),
298 _ => Err(format!("Unknown delta coordination backend: {s}")),
299 }
300 }
301}
302
303impl AsRef<str> for QueueDriver {
304 fn as_ref(&self) -> &str {
305 match self {
306 QueueDriver::Memory => "memory",
307 QueueDriver::Redis => "redis",
308 QueueDriver::RedisCluster => "redis-cluster",
309 QueueDriver::Nats => "nats",
310 QueueDriver::Pulsar => "pulsar",
311 QueueDriver::RabbitMq => "rabbitmq",
312 QueueDriver::GooglePubSub => "google-pubsub",
313 QueueDriver::Kafka => "kafka",
314 QueueDriver::Iggy => "iggy",
315 QueueDriver::Sqs => "sqs",
316 QueueDriver::Sns => "sns",
317 QueueDriver::None => "none",
318 }
319 }
320}
321
322#[derive(Debug, Clone, Serialize, Deserialize)]
323#[serde(default)]
324pub struct RedisClusterQueueConfig {
325 pub concurrency: u32,
326 pub prefix: Option<String>,
327 pub nodes: Vec<String>,
328 pub request_timeout_ms: u64,
329}
330
331impl Default for RedisClusterQueueConfig {
332 fn default() -> Self {
333 Self {
334 concurrency: 5,
335 prefix: Some("sockudo_queue:".to_string()),
336 nodes: vec!["redis://127.0.0.1:6379".to_string()],
337 request_timeout_ms: 5000,
338 }
339 }
340}
341
342#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
343#[serde(rename_all = "lowercase")]
344pub enum MetricsDriver {
345 #[default]
346 Prometheus,
347}
348
349#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
350#[serde(rename_all = "lowercase")]
351pub enum LogOutputFormat {
352 #[default]
353 Human,
354 Json,
355}
356
357impl FromStr for LogOutputFormat {
358 type Err = String;
359 fn from_str(s: &str) -> Result<Self, Self::Err> {
360 match s.to_lowercase().as_str() {
361 "human" => Ok(LogOutputFormat::Human),
362 "json" => Ok(LogOutputFormat::Json),
363 _ => Err(format!("Unknown log output format: {s}")),
364 }
365 }
366}
367
368impl FromStr for MetricsDriver {
369 type Err = String;
370 fn from_str(s: &str) -> Result<Self, Self::Err> {
371 match s.to_lowercase().as_str() {
372 "prometheus" => Ok(MetricsDriver::Prometheus),
373 _ => Err(format!("Unknown metrics driver: {s}")),
374 }
375 }
376}
377
378impl AsRef<str> for MetricsDriver {
379 fn as_ref(&self) -> &str {
380 match self {
381 MetricsDriver::Prometheus => "prometheus",
382 }
383 }
384}
385
386#[derive(Debug, Clone, Serialize, Deserialize)]
388#[serde(default)]
389pub struct ServerOptions {
390 pub adapter: AdapterConfig,
391 pub app_manager: AppManagerConfig,
392 pub cache: CacheConfig,
393 pub channel_limits: ChannelLimits,
394 pub cors: CorsConfig,
395 pub database: DatabaseConfig,
396 pub database_pooling: DatabasePooling,
397 pub debug: bool,
398 pub event_limits: EventLimits,
399 pub host: String,
400 pub http_api: HttpApiConfig,
401 pub instance: InstanceConfig,
402 pub logging: Option<LoggingConfig>,
403 pub metrics: MetricsConfig,
404 pub mode: String,
405 pub port: u16,
406 pub path_prefix: String,
407 pub presence: PresenceConfig,
408 pub queue: QueueConfig,
409 pub rate_limiter: RateLimiterConfig,
410 pub shutdown_grace_period: u64,
411 pub ssl: SslConfig,
412 pub user_authentication_timeout: u64,
413 pub webhooks: WebhooksConfig,
414 pub websocket_max_payload_kb: u32,
415 pub cleanup: CleanupConfig,
416 pub activity_timeout: u64,
417 pub cluster_health: ClusterHealthConfig,
418 pub unix_socket: UnixSocketConfig,
419 pub delta_compression: DeltaCompressionOptionsConfig,
420 pub tag_filtering: TagFilteringConfig,
421 pub websocket: WebSocketConfig,
422 pub connection_recovery: ConnectionRecoveryConfig,
423 pub history: HistoryConfig,
424 pub presence_history: PresenceHistoryConfig,
425 pub idempotency: IdempotencyConfig,
426 pub ephemeral: EphemeralConfig,
427 pub echo_control: EchoControlConfig,
428 pub event_name_filtering: EventNameFilteringConfig,
429 pub versioned_messages: VersionedMessagesConfig,
430 pub annotations: AnnotationsConfig,
431}
432
433#[derive(Debug, Clone, Serialize, Deserialize)]
436#[serde(default)]
437pub struct SqsQueueConfig {
438 pub region: String,
439 pub queue_url_prefix: Option<String>,
440 pub visibility_timeout: i32,
441 pub endpoint_url: Option<String>,
442 pub max_messages: i32,
443 pub wait_time_seconds: i32,
444 pub concurrency: u32,
445 pub fifo: bool,
446 pub message_group_id: Option<String>,
447}
448
449#[derive(Debug, Clone, Serialize, Deserialize)]
450#[serde(default)]
451pub struct SnsQueueConfig {
452 pub region: String,
453 pub topic_arn: String,
454 pub endpoint_url: Option<String>,
455}
456
457#[derive(Debug, Clone, Serialize, Deserialize)]
458#[serde(default)]
459pub struct AdapterConfig {
460 pub driver: AdapterDriver,
461 pub redis: RedisAdapterConfig,
462 pub cluster: RedisClusterAdapterConfig,
463 pub nats: NatsAdapterConfig,
464 pub pulsar: PulsarAdapterConfig,
465 pub rabbitmq: RabbitMqAdapterConfig,
466 pub google_pubsub: GooglePubSubAdapterConfig,
467 pub kafka: KafkaAdapterConfig,
468 pub iggy: IggyConfig,
469 #[serde(default = "default_buffer_multiplier_per_cpu")]
470 pub buffer_multiplier_per_cpu: usize,
471 pub cluster_health: ClusterHealthConfig,
472 #[serde(default = "default_enable_socket_counting")]
473 pub enable_socket_counting: bool,
474 #[serde(default = "default_fallback_to_local")]
475 pub fallback_to_local: bool,
476}
477
478fn default_enable_socket_counting() -> bool {
479 true
480}
481
482fn default_fallback_to_local() -> bool {
483 true
484}
485
486fn default_buffer_multiplier_per_cpu() -> usize {
487 64
488}
489
490impl Default for AdapterConfig {
491 fn default() -> Self {
492 Self {
493 driver: AdapterDriver::default(),
494 redis: RedisAdapterConfig::default(),
495 cluster: RedisClusterAdapterConfig::default(),
496 nats: NatsAdapterConfig::default(),
497 pulsar: PulsarAdapterConfig::default(),
498 rabbitmq: RabbitMqAdapterConfig::default(),
499 google_pubsub: GooglePubSubAdapterConfig::default(),
500 kafka: KafkaAdapterConfig::default(),
501 iggy: IggyConfig::default(),
502 buffer_multiplier_per_cpu: default_buffer_multiplier_per_cpu(),
503 cluster_health: ClusterHealthConfig::default(),
504 enable_socket_counting: default_enable_socket_counting(),
505 fallback_to_local: default_fallback_to_local(),
506 }
507 }
508}
509
510#[derive(Debug, Clone, Serialize, Deserialize)]
511#[serde(default)]
512pub struct RedisAdapterConfig {
513 pub requests_timeout: u64,
514 pub prefix: String,
515 pub redis_pub_options: AHashMap<String, sonic_rs::Value>,
516 pub redis_sub_options: AHashMap<String, sonic_rs::Value>,
517 pub cluster_mode: bool,
518}
519
520#[derive(Debug, Clone, Serialize, Deserialize)]
521#[serde(default)]
522pub struct RedisClusterAdapterConfig {
523 pub nodes: Vec<String>,
524 pub prefix: String,
525 pub request_timeout_ms: u64,
526 pub use_connection_manager: bool,
527 #[serde(default)]
528 pub use_sharded_pubsub: bool,
529}
530
531#[derive(Debug, Clone, Serialize, Deserialize)]
532#[serde(default)]
533pub struct NatsAdapterConfig {
534 pub servers: Vec<String>,
535 pub prefix: String,
536 pub request_timeout_ms: u64,
537 pub username: Option<String>,
538 pub password: Option<String>,
539 pub token: Option<String>,
540 pub connection_timeout_ms: u64,
541 pub nodes_number: Option<u32>,
542 pub discovery_max_wait_ms: u64,
543 pub discovery_idle_wait_ms: u64,
544}
545
546#[derive(Debug, Clone, Serialize, Deserialize)]
547#[serde(default)]
548pub struct PulsarAdapterConfig {
549 pub url: String,
550 pub prefix: String,
551 pub request_timeout_ms: u64,
552 pub token: Option<String>,
553 pub nodes_number: Option<u32>,
554}
555
556#[derive(Debug, Clone, Serialize, Deserialize)]
557#[serde(default)]
558pub struct RabbitMqAdapterConfig {
559 pub url: String,
560 pub prefix: String,
561 pub request_timeout_ms: u64,
562 pub connection_timeout_ms: u64,
563 pub nodes_number: Option<u32>,
564}
565
566#[derive(Debug, Clone, Serialize, Deserialize)]
567#[serde(default)]
568pub struct GooglePubSubAdapterConfig {
569 pub project_id: String,
570 pub prefix: String,
571 pub request_timeout_ms: u64,
572 pub emulator_host: Option<String>,
573 pub nodes_number: Option<u32>,
574}
575
576#[derive(Debug, Clone, Serialize, Deserialize)]
577#[serde(default)]
578pub struct KafkaAdapterConfig {
579 pub brokers: Vec<String>,
580 pub prefix: String,
581 pub request_timeout_ms: u64,
582 pub security_protocol: Option<String>,
583 pub sasl_mechanism: Option<String>,
584 pub sasl_username: Option<String>,
585 pub sasl_password: Option<String>,
586 pub nodes_number: Option<u32>,
587}
588
589#[derive(Debug, Clone, Serialize, Deserialize)]
590#[serde(default)]
591pub struct IggyConfig {
592 pub connection_string: String,
593 pub username: Option<String>,
594 pub password: Option<String>,
595 pub consumer_name: Option<String>,
596 pub stream: String,
597 pub topic_prefix: String,
598 pub queue_topic_prefix: String,
599 pub consumer_group_prefix: String,
600 pub request_timeout_ms: u64,
601 pub poll_interval_ms: u64,
602 pub poll_batch_size: u32,
603 pub partitions_count: u32,
604 pub partition_id: u32,
605 pub auto_create: bool,
606 pub start_from_latest: bool,
607 pub nodes_number: Option<u32>,
608}
609
610#[derive(Debug, Clone, Serialize, Deserialize, Default)]
611#[serde(default)]
612pub struct AppManagerConfig {
613 pub driver: AppManagerDriver,
614 pub array: ArrayConfig,
615 pub cache: CacheSettings,
616 pub scylladb: ScyllaDbSettings,
617}
618
619#[derive(Debug, Clone, Serialize, Deserialize, Default)]
620#[serde(default)]
621pub struct ArrayConfig {
622 pub apps: Vec<App>,
623}
624
625#[derive(Debug, Clone, Serialize, Deserialize)]
626#[serde(default)]
627pub struct CacheSettings {
628 pub enabled: bool,
629 pub ttl: u64,
630}
631
632#[derive(Debug, Clone, Serialize, Deserialize)]
633#[serde(default)]
634pub struct MemoryCacheOptions {
635 pub ttl: u64,
636 pub cleanup_interval: u64,
637 pub max_capacity: u64,
638}
639
640#[derive(Debug, Clone, Serialize, Deserialize)]
641#[serde(default)]
642pub struct CacheConfig {
643 pub driver: CacheDriver,
644 pub redis: RedisConfig,
645 pub memory: MemoryCacheOptions,
646}
647
648#[derive(Debug, Clone, Serialize, Deserialize, Default)]
649#[serde(default)]
650pub struct RedisConfig {
651 pub prefix: Option<String>,
652 pub url_override: Option<String>,
653 pub cluster_mode: bool,
654}
655
656#[derive(Debug, Clone, Serialize, Deserialize)]
657#[serde(default)]
658pub struct ChannelLimits {
659 pub max_name_length: u32,
660 pub cache_ttl: u64,
661}
662
663#[derive(Debug, Clone, Serialize, Deserialize)]
664#[serde(default)]
665pub struct CorsConfig {
666 pub credentials: bool,
667 #[serde(deserialize_with = "deserialize_and_validate_cors_origins")]
668 pub origin: Vec<String>,
669 pub methods: Vec<String>,
670 pub allowed_headers: Vec<String>,
671}
672
673fn deserialize_and_validate_cors_origins<'de, D>(deserializer: D) -> Result<Vec<String>, D::Error>
674where
675 D: serde::Deserializer<'de>,
676{
677 use serde::de::Error;
678 let origins = Vec::<String>::deserialize(deserializer)?;
679
680 if let Err(e) = crate::origin_validation::OriginValidator::validate_patterns(&origins) {
681 return Err(D::Error::custom(format!(
682 "CORS origin pattern validation failed: {}",
683 e
684 )));
685 }
686
687 Ok(origins)
688}
689
690#[derive(Debug, Clone, Serialize, Deserialize, Default)]
691#[serde(default)]
692pub struct DatabaseConfig {
693 pub mysql: DatabaseConnection,
694 pub postgres: DatabaseConnection,
695 pub redis: RedisConnection,
696 pub dynamodb: DynamoDbSettings,
697 pub surrealdb: SurrealDbSettings,
698 pub scylladb: ScyllaDbSettings,
699}
700
701#[derive(Debug, Clone, Serialize, Deserialize)]
702#[serde(default)]
703pub struct DatabaseConnection {
704 pub host: String,
705 pub port: u16,
706 pub username: String,
707 pub password: String,
708 pub database: String,
709 pub table_name: String,
710 pub connection_pool_size: u32,
711 pub pool_min: Option<u32>,
712 pub pool_max: Option<u32>,
713 pub cache_ttl: u64,
714 pub cache_cleanup_interval: u64,
715 pub cache_max_capacity: u64,
716}
717
718#[derive(Debug, Clone, Serialize, Deserialize)]
719#[serde(default)]
720pub struct RedisConnection {
721 pub host: String,
722 pub port: u16,
723 pub db: u32,
724 pub username: Option<String>,
725 pub password: Option<String>,
726 pub key_prefix: String,
727 pub sentinels: Vec<RedisSentinel>,
728 pub sentinel_password: Option<String>,
729 pub name: String,
730 pub cluster: RedisClusterConnection,
731 pub cluster_nodes: Vec<ClusterNode>,
733}
734
735#[derive(Debug, Clone, Serialize, Deserialize)]
736#[serde(default)]
737pub struct RedisSentinel {
738 pub host: String,
739 pub port: u16,
740}
741
742#[derive(Debug, Clone, Serialize, Deserialize, Default)]
743#[serde(default)]
744pub struct RedisClusterConnection {
745 pub nodes: Vec<ClusterNode>,
746 pub username: Option<String>,
747 pub password: Option<String>,
748 #[serde(alias = "useTLS")]
749 pub use_tls: bool,
750}
751
752impl RedisConnection {
753 pub fn is_sentinel_configured(&self) -> bool {
755 !self.sentinels.is_empty()
756 }
757
758 pub fn to_url(&self) -> String {
760 if self.is_sentinel_configured() {
761 self.build_sentinel_url()
762 } else {
763 self.build_standard_url()
764 }
765 }
766
767 fn build_standard_url(&self) -> String {
768 let (scheme, host) = if self.host.starts_with("rediss://") {
770 ("rediss://", self.host.trim_start_matches("rediss://"))
771 } else if self.host.starts_with("redis://") {
772 ("redis://", self.host.trim_start_matches("redis://"))
773 } else {
774 ("redis://", self.host.as_str())
775 };
776
777 let mut url = String::from(scheme);
778
779 if let Some(ref username) = self.username {
780 url.push_str(username);
781 if let Some(ref password) = self.password {
782 url.push(':');
783 url.push_str(&urlencoding::encode(password));
784 }
785 url.push('@');
786 } else if let Some(ref password) = self.password {
787 url.push(':');
788 url.push_str(&urlencoding::encode(password));
789 url.push('@');
790 }
791
792 url.push_str(host);
793 url.push(':');
794 url.push_str(&self.port.to_string());
795 url.push('/');
796 url.push_str(&self.db.to_string());
797
798 url
799 }
800
801 fn build_sentinel_url(&self) -> String {
802 let mut url = String::from("redis+sentinel://");
803
804 if let Some(ref sentinel_password) = self.sentinel_password {
805 url.push(':');
806 url.push_str(&urlencoding::encode(sentinel_password));
807 url.push('@');
808 }
809
810 let sentinel_hosts: Vec<String> = self
811 .sentinels
812 .iter()
813 .map(|s| format!("{}:{}", s.host, s.port))
814 .collect();
815 url.push_str(&sentinel_hosts.join(","));
816
817 url.push('/');
818 url.push_str(&self.name);
819 url.push('/');
820 url.push_str(&self.db.to_string());
821
822 let mut params = Vec::new();
823 if let Some(ref password) = self.password {
824 params.push(format!("password={}", urlencoding::encode(password)));
825 }
826 if let Some(ref username) = self.username {
827 params.push(format!("username={}", urlencoding::encode(username)));
828 }
829
830 if !params.is_empty() {
831 url.push('?');
832 url.push_str(¶ms.join("&"));
833 }
834
835 url
836 }
837
838 pub fn has_cluster_nodes(&self) -> bool {
841 !self.cluster.nodes.is_empty() || !self.cluster_nodes.is_empty()
842 }
843
844 pub fn cluster_node_urls(&self) -> Vec<String> {
847 if !self.cluster.nodes.is_empty() {
848 return self.build_cluster_urls(&self.cluster.nodes);
849 }
850 self.build_cluster_urls(&self.cluster_nodes)
851 }
852
853 pub fn normalize_cluster_seed_urls(&self, seeds: &[String]) -> Vec<String> {
856 self.build_cluster_urls(
857 &seeds
858 .iter()
859 .filter_map(|seed| ClusterNode::from_seed(seed))
860 .collect::<Vec<ClusterNode>>(),
861 )
862 }
863
864 fn build_cluster_urls(&self, nodes: &[ClusterNode]) -> Vec<String> {
865 let username = self
866 .cluster
867 .username
868 .as_deref()
869 .or(self.username.as_deref());
870 let password = self
871 .cluster
872 .password
873 .as_deref()
874 .or(self.password.as_deref());
875 let use_tls = self.cluster.use_tls;
876
877 nodes
878 .iter()
879 .map(|node| node.to_url_with_options(use_tls, username, password))
880 .collect()
881 }
882}
883
884impl RedisSentinel {
885 pub fn to_host_port(&self) -> String {
886 format!("{}:{}", self.host, self.port)
887 }
888}
889
890#[derive(Debug, Clone, Serialize, Deserialize)]
891#[serde(default)]
892pub struct ClusterNode {
893 pub host: String,
894 pub port: u16,
895}
896
897impl ClusterNode {
898 pub fn to_url(&self) -> String {
899 self.to_url_with_options(false, None, None)
900 }
901
902 pub fn to_url_with_options(
903 &self,
904 use_tls: bool,
905 username: Option<&str>,
906 password: Option<&str>,
907 ) -> String {
908 let host = self.host.trim();
909
910 if host.starts_with("redis://") || host.starts_with("rediss://") {
911 if let Ok(parsed) = Url::parse(host)
912 && let Some(host_str) = parsed.host_str()
913 {
914 let scheme = parsed.scheme();
915 let port = parsed.port_or_known_default().unwrap_or(self.port);
916 let parsed_username = (!parsed.username().is_empty()).then_some(parsed.username());
917 let parsed_password = parsed.password();
918 let has_embedded_auth = parsed_username.is_some() || parsed_password.is_some();
919 let (effective_username, effective_password) = if has_embedded_auth {
920 (parsed_username, parsed_password)
921 } else {
922 (username, password)
923 };
924
925 return build_redis_url(
926 scheme,
927 host_str,
928 port,
929 effective_username,
930 effective_password,
931 );
932 }
933
934 let has_port = if let Some(bracket_pos) = host.rfind(']') {
936 host[bracket_pos..].contains(':')
937 } else {
938 host.split(':').count() >= 3
939 };
940 let base = if has_port {
941 host.to_string()
942 } else {
943 format!("{}:{}", host, self.port)
944 };
945
946 if let Ok(parsed) = Url::parse(&base) {
947 let parsed_username = (!parsed.username().is_empty()).then_some(parsed.username());
948 let parsed_password = parsed.password();
949 if let Some(host_str) = parsed.host_str() {
950 let port = parsed.port_or_known_default().unwrap_or(self.port);
951 let has_embedded_auth = parsed_username.is_some() || parsed_password.is_some();
952 let (effective_username, effective_password) = if has_embedded_auth {
953 (parsed_username, parsed_password)
954 } else {
955 (username, password)
956 };
957 return build_redis_url(
958 parsed.scheme(),
959 host_str,
960 port,
961 effective_username,
962 effective_password,
963 );
964 }
965 }
966 return base;
967 }
968
969 let (normalized_host, normalized_port) = split_plain_host_and_port(host, self.port);
970 let scheme = if use_tls { "rediss" } else { "redis" };
971 build_redis_url(
972 scheme,
973 &normalized_host,
974 normalized_port,
975 username,
976 password,
977 )
978 }
979
980 pub fn from_seed(seed: &str) -> Option<Self> {
981 let trimmed = seed.trim();
982 if trimmed.is_empty() {
983 return None;
984 }
985
986 if trimmed.starts_with("redis://") || trimmed.starts_with("rediss://") {
987 let port = Url::parse(trimmed)
988 .ok()
989 .and_then(|parsed| parsed.port_or_known_default())
990 .unwrap_or(6379);
991 return Some(Self {
992 host: trimmed.to_string(),
993 port,
994 });
995 }
996
997 let (host, port) = split_plain_host_and_port(trimmed, 6379);
998 Some(Self { host, port })
999 }
1000}
1001
1002fn split_plain_host_and_port(raw_host: &str, default_port: u16) -> (String, u16) {
1003 let host = raw_host.trim();
1004
1005 if host.starts_with('[') {
1007 if let Some(end_bracket) = host.find(']') {
1008 let host_part = host[1..end_bracket].to_string();
1009 let remainder = &host[end_bracket + 1..];
1010 if let Some(port_str) = remainder.strip_prefix(':')
1011 && let Ok(port) = port_str.parse::<u16>()
1012 {
1013 return (host_part, port);
1014 }
1015 return (host_part, default_port);
1016 }
1017 return (host.to_string(), default_port);
1018 }
1019
1020 if host.matches(':').count() == 1
1022 && let Some((host_part, port_part)) = host.rsplit_once(':')
1023 && let Ok(port) = port_part.parse::<u16>()
1024 {
1025 return (host_part.to_string(), port);
1026 }
1027
1028 (host.to_string(), default_port)
1029}
1030
1031fn build_redis_url(
1032 scheme: &str,
1033 host: &str,
1034 port: u16,
1035 username: Option<&str>,
1036 password: Option<&str>,
1037) -> String {
1038 let mut url = format!("{scheme}://");
1039
1040 if let Some(user) = username {
1041 url.push_str(&urlencoding::encode(user));
1042 if let Some(pass) = password {
1043 url.push(':');
1044 url.push_str(&urlencoding::encode(pass));
1045 }
1046 url.push('@');
1047 } else if let Some(pass) = password {
1048 url.push(':');
1049 url.push_str(&urlencoding::encode(pass));
1050 url.push('@');
1051 }
1052
1053 if host.contains(':') && !host.starts_with('[') {
1054 url.push('[');
1055 url.push_str(host);
1056 url.push(']');
1057 } else {
1058 url.push_str(host);
1059 }
1060 url.push(':');
1061 url.push_str(&port.to_string());
1062 url
1063}
1064
1065#[derive(Debug, Clone, Serialize, Deserialize)]
1066#[serde(default)]
1067pub struct DatabasePooling {
1068 pub enabled: bool,
1069 pub min: u32,
1070 pub max: u32,
1071}
1072
1073#[derive(Debug, Clone, Serialize, Deserialize)]
1074#[serde(default)]
1075pub struct EventLimits {
1076 pub max_channels_at_once: u32,
1077 pub max_name_length: u32,
1078 pub max_payload_in_kb: u32,
1079 pub max_batch_size: u32,
1080}
1081
1082#[derive(Debug, Clone, Serialize, Deserialize)]
1083#[serde(default)]
1084pub struct IdempotencyConfig {
1085 pub enabled: bool,
1087 pub ttl_seconds: u64,
1089 pub max_key_length: usize,
1091}
1092
1093#[derive(Debug, Clone, Serialize, Deserialize)]
1094#[serde(default)]
1095pub struct EphemeralConfig {
1096 pub enabled: bool,
1098}
1099
1100#[derive(Debug, Clone, Serialize, Deserialize)]
1101#[serde(default)]
1102pub struct EchoControlConfig {
1103 pub enabled: bool,
1105 pub default_echo_messages: bool,
1107}
1108
1109#[derive(Debug, Clone, Serialize, Deserialize)]
1110#[serde(default)]
1111pub struct EventNameFilteringConfig {
1112 pub enabled: bool,
1114 pub max_events_per_filter: usize,
1116 pub max_event_name_length: usize,
1118}
1119
1120#[derive(Debug, Clone, Serialize, Deserialize)]
1121#[serde(default)]
1122pub struct ConnectionRecoveryConfig {
1123 pub enabled: bool,
1128 pub buffer_ttl_seconds: u64,
1130 pub max_buffer_size: usize,
1132}
1133
1134#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
1135#[serde(rename_all = "lowercase")]
1136pub enum VersionStoreDriver {
1137 #[default]
1138 Memory,
1139 Postgres,
1140 Mysql,
1141 DynamoDb,
1142 ScyllaDb,
1143 SurrealDb,
1144}
1145
1146impl FromStr for VersionStoreDriver {
1147 type Err = String;
1148 fn from_str(s: &str) -> Result<Self, Self::Err> {
1149 match s.to_lowercase().as_str() {
1150 "postgres" | "postgresql" | "pgsql" => Ok(Self::Postgres),
1151 "mysql" => Ok(Self::Mysql),
1152 "dynamodb" => Ok(Self::DynamoDb),
1153 "scylladb" | "scylla" => Ok(Self::ScyllaDb),
1154 "surrealdb" | "surreal" => Ok(Self::SurrealDb),
1155 "memory" => Ok(Self::Memory),
1156 _ => Err(format!("Unknown version store driver: {s}")),
1157 }
1158 }
1159}
1160
1161#[derive(Debug, Clone, Serialize, Deserialize)]
1162#[serde(default)]
1163pub struct VersionedMessagesConfig {
1164 pub enabled: bool,
1166 pub driver: VersionStoreDriver,
1168 pub max_page_size: usize,
1170 pub retention_window_seconds: u64,
1178 pub purge_interval_seconds: u64,
1181 pub purge_batch_size: usize,
1185 pub max_purge_per_tick: usize,
1188}
1189
1190#[derive(Debug, Clone, Serialize, Deserialize, Default)]
1191#[serde(default)]
1192pub struct AnnotationsConfig {
1193 pub enabled: bool,
1196}
1197
1198#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
1199#[serde(rename_all = "lowercase")]
1200pub enum HistoryBackend {
1201 #[default]
1202 Postgres,
1203 Mysql,
1204 DynamoDb,
1205 SurrealDb,
1206 ScyllaDb,
1207 Memory,
1208}
1209
1210impl FromStr for HistoryBackend {
1211 type Err = String;
1212 fn from_str(s: &str) -> Result<Self, Self::Err> {
1213 match s.to_lowercase().as_str() {
1214 "postgres" | "postgresql" | "pgsql" => Ok(Self::Postgres),
1215 "mysql" => Ok(Self::Mysql),
1216 "dynamodb" => Ok(Self::DynamoDb),
1217 "surrealdb" | "surreal" => Ok(Self::SurrealDb),
1218 "scylladb" | "scylla" => Ok(Self::ScyllaDb),
1219 "memory" => Ok(Self::Memory),
1220 _ => Err(format!("Unknown history backend: {s}")),
1221 }
1222 }
1223}
1224
1225#[derive(Debug, Clone, Serialize, Deserialize)]
1226#[serde(default)]
1227pub struct PostgresHistoryConfig {
1228 pub table_prefix: String,
1229 pub write_timeout_ms: u64,
1230}
1231
1232impl Default for PostgresHistoryConfig {
1233 fn default() -> Self {
1234 Self {
1235 table_prefix: "sockudo_history".to_string(),
1236 write_timeout_ms: 5000,
1237 }
1238 }
1239}
1240
1241#[derive(Debug, Clone, Serialize, Deserialize)]
1242#[serde(default)]
1243pub struct MySqlHistoryConfig {
1244 pub table_prefix: String,
1245 pub write_timeout_ms: u64,
1246}
1247
1248impl Default for MySqlHistoryConfig {
1249 fn default() -> Self {
1250 Self {
1251 table_prefix: "sockudo_history".to_string(),
1252 write_timeout_ms: 5000,
1253 }
1254 }
1255}
1256
1257#[derive(Debug, Clone, Serialize, Deserialize)]
1258#[serde(default)]
1259pub struct DynamoDbHistoryConfig {
1260 pub table_prefix: String,
1261 pub write_timeout_ms: u64,
1262}
1263
1264impl Default for DynamoDbHistoryConfig {
1265 fn default() -> Self {
1266 Self {
1267 table_prefix: "sockudo_history".to_string(),
1268 write_timeout_ms: 5000,
1269 }
1270 }
1271}
1272
1273#[derive(Debug, Clone, Serialize, Deserialize)]
1274#[serde(default)]
1275pub struct SurrealDbHistoryConfig {
1276 pub table_prefix: String,
1277 pub write_timeout_ms: u64,
1278}
1279
1280impl Default for SurrealDbHistoryConfig {
1281 fn default() -> Self {
1282 Self {
1283 table_prefix: "sockudo_history".to_string(),
1284 write_timeout_ms: 5000,
1285 }
1286 }
1287}
1288
1289#[derive(Debug, Clone, Serialize, Deserialize)]
1290#[serde(default)]
1291pub struct ScyllaDbHistoryConfig {
1292 pub table_prefix: String,
1293 pub write_timeout_ms: u64,
1294}
1295
1296impl Default for ScyllaDbHistoryConfig {
1297 fn default() -> Self {
1298 Self {
1299 table_prefix: "sockudo_history".to_string(),
1300 write_timeout_ms: 5000,
1301 }
1302 }
1303}
1304
1305#[derive(Debug, Clone, Serialize, Deserialize)]
1306#[serde(default)]
1307pub struct HistoryConfig {
1308 pub enabled: bool,
1309 pub rewind_enabled: bool,
1310 pub backend: HistoryBackend,
1311 pub retention_window_seconds: u64,
1312 pub max_page_size: usize,
1313 pub max_messages_per_channel: Option<usize>,
1314 pub max_bytes_per_channel: Option<u64>,
1315 pub writer_shards: usize,
1316 pub writer_queue_capacity: usize,
1317 pub purge_interval_seconds: u64,
1318 pub purge_batch_size: usize,
1319 pub max_purge_per_tick: usize,
1320 pub postgres: PostgresHistoryConfig,
1321 pub mysql: MySqlHistoryConfig,
1322 pub dynamodb: DynamoDbHistoryConfig,
1323 pub surrealdb: SurrealDbHistoryConfig,
1324 pub scylladb: ScyllaDbHistoryConfig,
1325}
1326
1327impl Default for HistoryConfig {
1328 fn default() -> Self {
1329 Self {
1330 enabled: false,
1331 rewind_enabled: true,
1332 backend: HistoryBackend::Postgres,
1333 retention_window_seconds: 86400,
1334 max_page_size: 100,
1335 max_messages_per_channel: None,
1336 max_bytes_per_channel: None,
1337 writer_shards: 16,
1338 writer_queue_capacity: 4096,
1339 purge_interval_seconds: 300,
1340 purge_batch_size: 1000,
1341 max_purge_per_tick: 100_000,
1342 postgres: PostgresHistoryConfig::default(),
1343 mysql: MySqlHistoryConfig::default(),
1344 dynamodb: DynamoDbHistoryConfig::default(),
1345 surrealdb: SurrealDbHistoryConfig::default(),
1346 scylladb: ScyllaDbHistoryConfig::default(),
1347 }
1348 }
1349}
1350
1351impl Default for VersionedMessagesConfig {
1352 fn default() -> Self {
1353 Self {
1354 enabled: false,
1355 driver: VersionStoreDriver::Memory,
1356 max_page_size: 100,
1357 retention_window_seconds: 0,
1358 purge_interval_seconds: 300,
1359 purge_batch_size: 1000,
1360 max_purge_per_tick: 100_000,
1361 }
1362 }
1363}
1364
1365#[derive(Debug, Clone, Serialize, Deserialize)]
1366#[serde(default)]
1367pub struct PresenceHistoryConfig {
1368 pub enabled: bool,
1369 pub retention_window_seconds: u64,
1370 pub max_page_size: usize,
1371 pub max_events_per_channel: Option<usize>,
1372 pub max_bytes_per_channel: Option<u64>,
1373}
1374
1375impl Default for PresenceHistoryConfig {
1376 fn default() -> Self {
1377 Self {
1378 enabled: false,
1379 retention_window_seconds: 86400,
1380 max_page_size: 100,
1381 max_events_per_channel: None,
1382 max_bytes_per_channel: None,
1383 }
1384 }
1385}
1386
1387#[derive(Debug, Clone, Serialize, Deserialize)]
1388#[serde(default)]
1389pub struct HttpApiConfig {
1390 pub request_limit_in_mb: u32,
1391 pub accept_traffic: AcceptTraffic,
1392 pub usage_enabled: bool,
1393}
1394
1395#[derive(Debug, Clone, Serialize, Deserialize)]
1396#[serde(default)]
1397pub struct AcceptTraffic {
1398 pub memory_threshold: f64,
1399}
1400
1401#[derive(Debug, Clone, Serialize, Deserialize)]
1402#[serde(default)]
1403pub struct InstanceConfig {
1404 pub process_id: String,
1405}
1406
1407#[derive(Debug, Clone, Serialize, Deserialize)]
1408#[serde(default)]
1409pub struct MetricsConfig {
1410 pub enabled: bool,
1411 pub driver: MetricsDriver,
1412 pub host: String,
1413 pub prometheus: PrometheusConfig,
1414 pub port: u16,
1415}
1416
1417#[derive(Debug, Clone, Serialize, Deserialize)]
1418#[serde(default)]
1419pub struct PrometheusConfig {
1420 pub prefix: String,
1421}
1422
1423#[derive(Debug, Clone, Serialize, Deserialize)]
1424#[serde(default)]
1425pub struct LoggingConfig {
1426 pub colors_enabled: bool,
1427 pub include_target: bool,
1428}
1429
1430#[derive(Debug, Clone, Serialize, Deserialize)]
1431#[serde(default)]
1432pub struct PresenceConfig {
1433 pub max_members_per_channel: u32,
1434 pub max_member_size_in_kb: u32,
1435}
1436
1437#[derive(Debug, Clone, Serialize, Deserialize)]
1440#[serde(default)]
1441pub struct WebSocketConfig {
1442 pub max_messages: Option<usize>,
1443 pub max_bytes: Option<usize>,
1444 pub disconnect_on_buffer_full: bool,
1445 pub max_message_size: usize,
1446 pub max_frame_size: usize,
1447 pub write_buffer_size: usize,
1448 pub max_backpressure: usize,
1449 pub auto_ping: bool,
1450 pub ping_interval: u32,
1451 pub idle_timeout: u32,
1452 pub compression: String,
1453}
1454
1455impl Default for WebSocketConfig {
1456 fn default() -> Self {
1457 Self {
1458 max_messages: Some(1000),
1459 max_bytes: None,
1460 disconnect_on_buffer_full: true,
1461 max_message_size: 64 * 1024 * 1024,
1462 max_frame_size: 16 * 1024 * 1024,
1463 write_buffer_size: 16 * 1024,
1464 max_backpressure: 1024 * 1024,
1465 auto_ping: true,
1466 ping_interval: 30,
1467 idle_timeout: 120,
1468 compression: "disabled".to_string(),
1469 }
1470 }
1471}
1472
1473impl WebSocketConfig {
1474 pub fn to_buffer_config(&self) -> crate::websocket::WebSocketBufferConfig {
1476 use crate::websocket::{BufferLimit, WebSocketBufferConfig};
1477
1478 let limit = match (self.max_messages, self.max_bytes) {
1479 (Some(messages), Some(bytes)) => BufferLimit::Both { messages, bytes },
1480 (Some(messages), None) => BufferLimit::Messages(messages),
1481 (None, Some(bytes)) => BufferLimit::Bytes(bytes),
1482 (None, None) => BufferLimit::Messages(1000),
1483 };
1484
1485 WebSocketBufferConfig {
1486 limit,
1487 disconnect_on_full: self.disconnect_on_buffer_full,
1488 }
1489 }
1490
1491 pub fn to_sockudo_ws_config(
1493 &self,
1494 websocket_max_payload_kb: u32,
1495 activity_timeout: u64,
1496 ) -> sockudo_ws::Config {
1497 use sockudo_ws::Compression;
1498
1499 let compression = match self.compression.to_lowercase().as_str() {
1500 "dedicated" => Compression::Dedicated,
1501 "shared" => Compression::Shared,
1502 "window256b" => Compression::Window256B,
1503 "window1kb" => Compression::Window1KB,
1504 "window2kb" => Compression::Window2KB,
1505 "window4kb" => Compression::Window4KB,
1506 "window8kb" => Compression::Window8KB,
1507 "window16kb" => Compression::Window16KB,
1508 "window32kb" => Compression::Window32KB,
1509 _ => Compression::Disabled,
1510 };
1511
1512 sockudo_ws::Config::builder()
1513 .max_payload_length(
1514 self.max_bytes
1515 .unwrap_or(websocket_max_payload_kb as usize * 1024),
1516 )
1517 .max_message_size(self.max_message_size)
1518 .max_frame_size(self.max_frame_size)
1519 .write_buffer_size(self.write_buffer_size)
1520 .max_backpressure(self.max_backpressure)
1521 .idle_timeout(self.idle_timeout)
1522 .auto_ping(self.auto_ping)
1523 .ping_interval(self.ping_interval.max((activity_timeout / 2).max(5) as u32))
1524 .compression(compression)
1525 .build()
1526 }
1527}
1528
1529#[derive(Debug, Clone, Serialize, Deserialize, Default)]
1530#[serde(default)]
1531pub struct QueueConfig {
1532 pub driver: QueueDriver,
1533 pub redis: RedisQueueConfig,
1534 pub redis_cluster: RedisClusterQueueConfig,
1535 pub nats: NatsAdapterConfig,
1536 pub pulsar: PulsarAdapterConfig,
1537 pub rabbitmq: RabbitMqAdapterConfig,
1538 pub google_pubsub: GooglePubSubAdapterConfig,
1539 pub kafka: KafkaAdapterConfig,
1540 pub iggy: IggyConfig,
1541 pub sqs: SqsQueueConfig,
1542 pub sns: SnsQueueConfig,
1543}
1544
1545#[derive(Debug, Clone, Serialize, Deserialize)]
1546#[serde(default)]
1547pub struct RedisQueueConfig {
1548 pub concurrency: u32,
1549 pub prefix: Option<String>,
1550 pub url_override: Option<String>,
1551 pub cluster_mode: bool,
1552}
1553
1554#[derive(Clone, Debug, Serialize, Deserialize)]
1555#[serde(default)]
1556pub struct RateLimit {
1557 pub max_requests: u32,
1558 pub window_seconds: u64,
1559 pub identifier: Option<String>,
1560 pub trust_hops: Option<u32>,
1561}
1562
1563#[derive(Debug, Clone, Serialize, Deserialize)]
1564#[serde(default)]
1565pub struct RateLimiterConfig {
1566 pub enabled: bool,
1567 pub driver: CacheDriver,
1568 pub api_rate_limit: RateLimit,
1569 pub websocket_rate_limit: RateLimit,
1570 pub redis: RedisConfig,
1571}
1572
1573#[derive(Debug, Clone, Serialize, Deserialize)]
1574#[serde(default)]
1575pub struct SslConfig {
1576 pub enabled: bool,
1577 pub cert_path: String,
1578 pub key_path: String,
1579 pub passphrase: Option<String>,
1580 pub ca_path: Option<String>,
1581 pub redirect_http: bool,
1582 pub http_port: Option<u16>,
1583}
1584
1585#[derive(Debug, Clone, Serialize, Deserialize)]
1586#[serde(default)]
1587pub struct WebhooksConfig {
1588 pub batching: BatchingConfig,
1589 pub retry: WebhookRetryConfig,
1590 pub request_timeout_ms: u64,
1591}
1592
1593#[derive(Debug, Clone, Serialize, Deserialize)]
1594#[serde(default)]
1595pub struct BatchingConfig {
1596 pub enabled: bool,
1597 pub duration: u64,
1598 pub size: usize,
1599}
1600
1601#[derive(Debug, Clone, Serialize, Deserialize)]
1602#[serde(default)]
1603pub struct WebhookRetryConfig {
1604 pub enabled: bool,
1605 pub max_attempts: Option<u32>,
1606 pub max_elapsed_time_ms: u64,
1607 pub initial_backoff_ms: u64,
1608 pub max_backoff_ms: u64,
1609}
1610
1611impl Default for WebhooksConfig {
1612 fn default() -> Self {
1613 Self {
1614 batching: BatchingConfig::default(),
1615 retry: WebhookRetryConfig::default(),
1616 request_timeout_ms: 10_000,
1617 }
1618 }
1619}
1620
1621impl Default for WebhookRetryConfig {
1622 fn default() -> Self {
1623 Self {
1624 enabled: true,
1625 max_attempts: None,
1626 max_elapsed_time_ms: 300_000,
1627 initial_backoff_ms: 1_000,
1628 max_backoff_ms: 60_000,
1629 }
1630 }
1631}
1632
1633#[derive(Debug, Clone, Serialize, Deserialize)]
1634#[serde(default)]
1635pub struct ClusterHealthConfig {
1636 pub enabled: bool,
1637 pub heartbeat_interval_ms: u64,
1638 pub node_timeout_ms: u64,
1639 pub cleanup_interval_ms: u64,
1640}
1641
1642#[derive(Debug, Clone, Serialize, Deserialize)]
1643#[serde(default)]
1644pub struct UnixSocketConfig {
1645 pub enabled: bool,
1646 pub path: String,
1647 #[serde(deserialize_with = "deserialize_octal_permission")]
1648 pub permission_mode: u32,
1649}
1650
1651#[derive(Debug, Clone, Serialize, Deserialize)]
1652#[serde(default)]
1653pub struct DeltaCompressionOptionsConfig {
1654 pub enabled: bool,
1655 pub algorithm: String,
1656 pub full_message_interval: u32,
1657 pub min_message_size: usize,
1658 pub max_state_age_secs: u64,
1659 pub max_channel_states_per_socket: usize,
1660 pub max_conflation_states_per_channel: Option<usize>,
1661 pub conflation_key_path: Option<String>,
1662 pub cluster_coordination: bool,
1663 pub coordination_backend: DeltaCoordinationBackend,
1664 pub omit_delta_algorithm: bool,
1665}
1666
1667#[derive(Debug, Clone, Serialize, Deserialize)]
1669#[serde(default)]
1670pub struct CleanupConfig {
1671 pub queue_buffer_size: usize,
1672 pub batch_size: usize,
1673 pub batch_timeout_ms: u64,
1674 pub worker_threads: WorkerThreadsConfig,
1675 pub max_retry_attempts: u32,
1676 pub async_enabled: bool,
1677 pub fallback_to_sync: bool,
1678}
1679
1680#[derive(Debug, Clone)]
1682pub enum WorkerThreadsConfig {
1683 Auto,
1684 Fixed(usize),
1685}
1686
1687impl serde::Serialize for WorkerThreadsConfig {
1688 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
1689 where
1690 S: serde::Serializer,
1691 {
1692 match self {
1693 WorkerThreadsConfig::Auto => serializer.serialize_str("auto"),
1694 WorkerThreadsConfig::Fixed(n) => serializer.serialize_u64(*n as u64),
1695 }
1696 }
1697}
1698
1699impl<'de> serde::Deserialize<'de> for WorkerThreadsConfig {
1700 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
1701 where
1702 D: serde::Deserializer<'de>,
1703 {
1704 use serde::de;
1705 struct WorkerThreadsVisitor;
1706 impl<'de> de::Visitor<'de> for WorkerThreadsVisitor {
1707 type Value = WorkerThreadsConfig;
1708
1709 fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
1710 formatter.write_str(r#""auto" or a positive integer"#)
1711 }
1712
1713 fn visit_str<E: de::Error>(self, value: &str) -> Result<Self::Value, E> {
1714 if value.eq_ignore_ascii_case("auto") {
1715 Ok(WorkerThreadsConfig::Auto)
1716 } else if let Ok(n) = value.parse::<usize>() {
1717 Ok(WorkerThreadsConfig::Fixed(n))
1718 } else {
1719 Err(E::custom(format!(
1720 "expected 'auto' or a number, got '{value}'"
1721 )))
1722 }
1723 }
1724
1725 fn visit_u64<E: de::Error>(self, value: u64) -> Result<Self::Value, E> {
1726 Ok(WorkerThreadsConfig::Fixed(value as usize))
1727 }
1728
1729 fn visit_i64<E: de::Error>(self, value: i64) -> Result<Self::Value, E> {
1730 if value >= 0 {
1731 Ok(WorkerThreadsConfig::Fixed(value as usize))
1732 } else {
1733 Err(E::custom("worker_threads must be non-negative"))
1734 }
1735 }
1736 }
1737 deserializer.deserialize_any(WorkerThreadsVisitor)
1738 }
1739}
1740
1741impl Default for CleanupConfig {
1742 fn default() -> Self {
1743 Self {
1744 queue_buffer_size: 1024,
1745 batch_size: 64,
1746 batch_timeout_ms: 100,
1747 worker_threads: WorkerThreadsConfig::Auto,
1748 max_retry_attempts: 3,
1749 async_enabled: true,
1750 fallback_to_sync: true,
1751 }
1752 }
1753}
1754
1755impl CleanupConfig {
1756 pub fn validate(&self) -> Result<(), String> {
1757 if self.queue_buffer_size == 0 {
1758 return Err("queue_buffer_size must be greater than 0".to_string());
1759 }
1760 if self.batch_size == 0 {
1761 return Err("batch_size must be greater than 0".to_string());
1762 }
1763 if self.batch_timeout_ms == 0 {
1764 return Err("batch_timeout_ms must be greater than 0".to_string());
1765 }
1766 if let WorkerThreadsConfig::Fixed(n) = self.worker_threads
1767 && n == 0
1768 {
1769 return Err("worker_threads must be greater than 0 when using fixed count".to_string());
1770 }
1771 Ok(())
1772 }
1773}
1774
1775#[derive(Debug, Clone, Serialize, Deserialize, Default)]
1776#[serde(default)]
1777pub struct TagFilteringConfig {
1778 #[serde(default)]
1779 pub enabled: bool,
1780 #[serde(default = "default_true")]
1781 pub enable_tags: bool,
1782}
1783
1784fn default_true() -> bool {
1785 true
1786}
1787
1788impl Default for ServerOptions {
1791 fn default() -> Self {
1792 Self {
1793 adapter: AdapterConfig::default(),
1794 app_manager: AppManagerConfig::default(),
1795 cache: CacheConfig::default(),
1796 channel_limits: ChannelLimits::default(),
1797 cors: CorsConfig::default(),
1798 database: DatabaseConfig::default(),
1799 database_pooling: DatabasePooling::default(),
1800 debug: false,
1801 tag_filtering: TagFilteringConfig::default(),
1802 event_limits: EventLimits::default(),
1803 host: "0.0.0.0".to_string(),
1804 http_api: HttpApiConfig::default(),
1805 instance: InstanceConfig::default(),
1806 logging: None,
1807 metrics: MetricsConfig::default(),
1808 mode: "production".to_string(),
1809 port: 6001,
1810 path_prefix: "/".to_string(),
1811 presence: PresenceConfig::default(),
1812 queue: QueueConfig::default(),
1813 rate_limiter: RateLimiterConfig::default(),
1814 shutdown_grace_period: 10,
1815 ssl: SslConfig::default(),
1816 user_authentication_timeout: 3600,
1817 webhooks: WebhooksConfig::default(),
1818 websocket_max_payload_kb: 64,
1819 cleanup: CleanupConfig::default(),
1820 activity_timeout: 120,
1821 cluster_health: ClusterHealthConfig::default(),
1822 unix_socket: UnixSocketConfig::default(),
1823 delta_compression: DeltaCompressionOptionsConfig::default(),
1824 websocket: WebSocketConfig::default(),
1825 connection_recovery: ConnectionRecoveryConfig::default(),
1826 history: HistoryConfig::default(),
1827 presence_history: PresenceHistoryConfig::default(),
1828 idempotency: IdempotencyConfig::default(),
1829 ephemeral: EphemeralConfig::default(),
1830 echo_control: EchoControlConfig::default(),
1831 event_name_filtering: EventNameFilteringConfig::default(),
1832 versioned_messages: VersionedMessagesConfig::default(),
1833 annotations: AnnotationsConfig::default(),
1834 }
1835 }
1836}
1837
1838impl Default for SqsQueueConfig {
1839 fn default() -> Self {
1840 Self {
1841 region: "us-east-1".to_string(),
1842 queue_url_prefix: None,
1843 visibility_timeout: 30,
1844 endpoint_url: None,
1845 max_messages: 10,
1846 wait_time_seconds: 5,
1847 concurrency: 5,
1848 fifo: false,
1849 message_group_id: Some("default".to_string()),
1850 }
1851 }
1852}
1853
1854impl Default for SnsQueueConfig {
1855 fn default() -> Self {
1856 Self {
1857 region: "us-east-1".to_string(),
1858 topic_arn: String::new(),
1859 endpoint_url: None,
1860 }
1861 }
1862}
1863
1864impl Default for RedisAdapterConfig {
1865 fn default() -> Self {
1866 Self {
1867 requests_timeout: 5000,
1868 prefix: "sockudo_adapter:".to_string(),
1869 redis_pub_options: AHashMap::new(),
1870 redis_sub_options: AHashMap::new(),
1871 cluster_mode: false,
1872 }
1873 }
1874}
1875
1876impl Default for RedisClusterAdapterConfig {
1877 fn default() -> Self {
1878 Self {
1879 nodes: vec![],
1880 prefix: "sockudo_adapter:".to_string(),
1881 request_timeout_ms: 1000,
1882 use_connection_manager: true,
1883 use_sharded_pubsub: false,
1884 }
1885 }
1886}
1887
1888impl Default for NatsAdapterConfig {
1889 fn default() -> Self {
1890 Self {
1891 servers: vec!["nats://localhost:4222".to_string()],
1892 prefix: "sockudo_adapter:".to_string(),
1893 request_timeout_ms: 5000,
1894 username: None,
1895 password: None,
1896 token: None,
1897 connection_timeout_ms: 5000,
1898 nodes_number: None,
1899 discovery_max_wait_ms: 1000,
1900 discovery_idle_wait_ms: 150,
1901 }
1902 }
1903}
1904
1905impl Default for PulsarAdapterConfig {
1906 fn default() -> Self {
1907 Self {
1908 url: "pulsar://127.0.0.1:6650".to_string(),
1909 prefix: "sockudo-adapter".to_string(),
1910 request_timeout_ms: 5000,
1911 token: None,
1912 nodes_number: None,
1913 }
1914 }
1915}
1916
1917impl Default for RabbitMqAdapterConfig {
1918 fn default() -> Self {
1919 Self {
1920 url: "amqp://guest:guest@127.0.0.1:5672/%2f".to_string(),
1921 prefix: "sockudo_adapter".to_string(),
1922 request_timeout_ms: 5000,
1923 connection_timeout_ms: 5000,
1924 nodes_number: None,
1925 }
1926 }
1927}
1928
1929impl Default for GooglePubSubAdapterConfig {
1930 fn default() -> Self {
1931 Self {
1932 project_id: "".to_string(),
1933 prefix: "sockudo-adapter".to_string(),
1934 request_timeout_ms: 5000,
1935 emulator_host: None,
1936 nodes_number: None,
1937 }
1938 }
1939}
1940
1941impl Default for KafkaAdapterConfig {
1942 fn default() -> Self {
1943 Self {
1944 brokers: vec!["localhost:9092".to_string()],
1945 prefix: "sockudo_adapter".to_string(),
1946 request_timeout_ms: 5000,
1947 security_protocol: None,
1948 sasl_mechanism: None,
1949 sasl_username: None,
1950 sasl_password: None,
1951 nodes_number: None,
1952 }
1953 }
1954}
1955
1956impl Default for IggyConfig {
1957 fn default() -> Self {
1958 Self {
1959 connection_string: "iggy://iggy:iggy@127.0.0.1:8090".to_string(),
1960 username: None,
1961 password: None,
1962 consumer_name: None,
1963 stream: "sockudo".to_string(),
1964 topic_prefix: "sockudo-adapter".to_string(),
1965 queue_topic_prefix: "sockudo-queue".to_string(),
1966 consumer_group_prefix: "sockudo-workers".to_string(),
1967 request_timeout_ms: 5000,
1968 poll_interval_ms: 50,
1969 poll_batch_size: 100,
1970 partitions_count: 1,
1971 partition_id: 0,
1972 auto_create: true,
1973 start_from_latest: true,
1974 nodes_number: None,
1975 }
1976 }
1977}
1978
1979impl Default for CacheSettings {
1980 fn default() -> Self {
1981 Self {
1982 enabled: true,
1983 ttl: 300,
1984 }
1985 }
1986}
1987
1988impl Default for MemoryCacheOptions {
1989 fn default() -> Self {
1990 Self {
1991 ttl: 300,
1992 cleanup_interval: 60,
1993 max_capacity: 10000,
1994 }
1995 }
1996}
1997
1998impl Default for CacheConfig {
1999 fn default() -> Self {
2000 Self {
2001 driver: CacheDriver::default(),
2002 redis: RedisConfig {
2003 prefix: Some("sockudo_cache:".to_string()),
2004 url_override: None,
2005 cluster_mode: false,
2006 },
2007 memory: MemoryCacheOptions::default(),
2008 }
2009 }
2010}
2011
2012impl Default for ChannelLimits {
2013 fn default() -> Self {
2014 Self {
2015 max_name_length: 200,
2016 cache_ttl: 3600,
2017 }
2018 }
2019}
2020impl Default for CorsConfig {
2021 fn default() -> Self {
2022 Self {
2023 credentials: true,
2024 origin: vec!["*".to_string()],
2025 methods: vec!["GET".to_string(), "POST".to_string(), "OPTIONS".to_string()],
2026 allowed_headers: vec![
2027 "Authorization".to_string(),
2028 "Content-Type".to_string(),
2029 "X-Requested-With".to_string(),
2030 "Accept".to_string(),
2031 ],
2032 }
2033 }
2034}
2035
2036impl Default for DatabaseConnection {
2037 fn default() -> Self {
2038 Self {
2039 host: "localhost".to_string(),
2040 port: 3306,
2041 username: "root".to_string(),
2042 password: "".to_string(),
2043 database: "sockudo".to_string(),
2044 table_name: "applications".to_string(),
2045 connection_pool_size: 10,
2046 pool_min: None,
2047 pool_max: None,
2048 cache_ttl: 300,
2049 cache_cleanup_interval: 60,
2050 cache_max_capacity: 100,
2051 }
2052 }
2053}
2054
2055impl Default for RedisConnection {
2056 fn default() -> Self {
2057 Self {
2058 host: "127.0.0.1".to_string(),
2059 port: 6379,
2060 db: 0,
2061 username: None,
2062 password: None,
2063 key_prefix: "sockudo:".to_string(),
2064 sentinels: Vec::new(),
2065 sentinel_password: None,
2066 name: "mymaster".to_string(),
2067 cluster: RedisClusterConnection::default(),
2068 cluster_nodes: Vec::new(),
2069 }
2070 }
2071}
2072
2073impl Default for RedisSentinel {
2074 fn default() -> Self {
2075 Self {
2076 host: "localhost".to_string(),
2077 port: 26379,
2078 }
2079 }
2080}
2081
2082impl Default for ClusterNode {
2083 fn default() -> Self {
2084 Self {
2085 host: "127.0.0.1".to_string(),
2086 port: 7000,
2087 }
2088 }
2089}
2090
2091impl Default for DatabasePooling {
2092 fn default() -> Self {
2093 Self {
2094 enabled: true,
2095 min: 2,
2096 max: 10,
2097 }
2098 }
2099}
2100
2101impl Default for EventLimits {
2102 fn default() -> Self {
2103 Self {
2104 max_channels_at_once: 100,
2105 max_name_length: 200,
2106 max_payload_in_kb: 100,
2107 max_batch_size: 10,
2108 }
2109 }
2110}
2111
2112impl Default for IdempotencyConfig {
2113 fn default() -> Self {
2114 Self {
2115 enabled: true,
2116 ttl_seconds: 120,
2117 max_key_length: 128,
2118 }
2119 }
2120}
2121
2122impl Default for EphemeralConfig {
2123 fn default() -> Self {
2124 Self { enabled: true }
2125 }
2126}
2127
2128impl Default for EchoControlConfig {
2129 fn default() -> Self {
2130 Self {
2131 enabled: true,
2132 default_echo_messages: true,
2133 }
2134 }
2135}
2136
2137impl Default for EventNameFilteringConfig {
2138 fn default() -> Self {
2139 Self {
2140 enabled: true,
2141 max_events_per_filter: 50,
2142 max_event_name_length: 200,
2143 }
2144 }
2145}
2146
2147impl Default for ConnectionRecoveryConfig {
2148 fn default() -> Self {
2149 Self {
2150 enabled: false,
2151 buffer_ttl_seconds: 120,
2152 max_buffer_size: 100,
2153 }
2154 }
2155}
2156
2157impl Default for HttpApiConfig {
2158 fn default() -> Self {
2159 Self {
2160 request_limit_in_mb: 10,
2161 accept_traffic: AcceptTraffic::default(),
2162 usage_enabled: true,
2163 }
2164 }
2165}
2166
2167impl Default for AcceptTraffic {
2168 fn default() -> Self {
2169 Self {
2170 memory_threshold: 0.90,
2171 }
2172 }
2173}
2174
2175impl Default for InstanceConfig {
2176 fn default() -> Self {
2177 Self {
2178 process_id: uuid::Uuid::new_v4().to_string(),
2179 }
2180 }
2181}
2182
2183impl Default for LoggingConfig {
2184 fn default() -> Self {
2185 Self {
2186 colors_enabled: true,
2187 include_target: true,
2188 }
2189 }
2190}
2191
2192impl Default for MetricsConfig {
2193 fn default() -> Self {
2194 Self {
2195 enabled: true,
2196 driver: MetricsDriver::default(),
2197 host: "0.0.0.0".to_string(),
2198 prometheus: PrometheusConfig::default(),
2199 port: 9601,
2200 }
2201 }
2202}
2203
2204impl Default for PrometheusConfig {
2205 fn default() -> Self {
2206 Self {
2207 prefix: "sockudo_".to_string(),
2208 }
2209 }
2210}
2211
2212impl Default for PresenceConfig {
2213 fn default() -> Self {
2214 Self {
2215 max_members_per_channel: 100,
2216 max_member_size_in_kb: 2,
2217 }
2218 }
2219}
2220
2221impl Default for RedisQueueConfig {
2222 fn default() -> Self {
2223 Self {
2224 concurrency: 5,
2225 prefix: Some("sockudo_queue:".to_string()),
2226 url_override: None,
2227 cluster_mode: false,
2228 }
2229 }
2230}
2231
2232impl Default for RateLimit {
2233 fn default() -> Self {
2234 Self {
2235 max_requests: 60,
2236 window_seconds: 60,
2237 identifier: Some("default".to_string()),
2238 trust_hops: Some(0),
2239 }
2240 }
2241}
2242
2243impl Default for RateLimiterConfig {
2244 fn default() -> Self {
2245 Self {
2246 enabled: true,
2247 driver: CacheDriver::Memory,
2248 api_rate_limit: RateLimit {
2249 max_requests: 100,
2250 window_seconds: 60,
2251 identifier: Some("api".to_string()),
2252 trust_hops: Some(0),
2253 },
2254 websocket_rate_limit: RateLimit {
2255 max_requests: 20,
2256 window_seconds: 60,
2257 identifier: Some("websocket_connect".to_string()),
2258 trust_hops: Some(0),
2259 },
2260 redis: RedisConfig {
2261 prefix: Some("sockudo_rl:".to_string()),
2262 url_override: None,
2263 cluster_mode: false,
2264 },
2265 }
2266 }
2267}
2268
2269impl Default for SslConfig {
2270 fn default() -> Self {
2271 Self {
2272 enabled: false,
2273 cert_path: "".to_string(),
2274 key_path: "".to_string(),
2275 passphrase: None,
2276 ca_path: None,
2277 redirect_http: false,
2278 http_port: Some(80),
2279 }
2280 }
2281}
2282
2283impl Default for BatchingConfig {
2284 fn default() -> Self {
2285 Self {
2286 enabled: true,
2287 duration: 50,
2288 size: 100,
2289 }
2290 }
2291}
2292
2293impl Default for ClusterHealthConfig {
2294 fn default() -> Self {
2295 Self {
2296 enabled: true,
2297 heartbeat_interval_ms: 10000,
2298 node_timeout_ms: 30000,
2299 cleanup_interval_ms: 10000,
2300 }
2301 }
2302}
2303
2304impl Default for UnixSocketConfig {
2305 fn default() -> Self {
2306 Self {
2307 enabled: false,
2308 path: "/var/run/sockudo/sockudo.sock".to_string(),
2309 permission_mode: 0o660,
2310 }
2311 }
2312}
2313
2314impl Default for DeltaCompressionOptionsConfig {
2315 fn default() -> Self {
2316 Self {
2317 enabled: true,
2318 algorithm: "fossil".to_string(),
2319 full_message_interval: 10,
2320 min_message_size: 100,
2321 max_state_age_secs: 300,
2322 max_channel_states_per_socket: 100,
2323 max_conflation_states_per_channel: Some(100),
2324 conflation_key_path: None,
2325 cluster_coordination: false,
2326 coordination_backend: DeltaCoordinationBackend::Auto,
2327 omit_delta_algorithm: false,
2328 }
2329 }
2330}
2331
2332#[cfg(test)]
2333mod tests {
2334 use super::{DeltaCoordinationBackend, QueueDriver, ServerOptions, VersionStoreDriver};
2335 use std::str::FromStr;
2336
2337 #[test]
2338 fn queue_driver_parses_broker_backends() {
2339 assert_eq!(
2340 QueueDriver::from_str("rabbitmq").unwrap(),
2341 QueueDriver::RabbitMq
2342 );
2343 assert_eq!(QueueDriver::from_str("kafka").unwrap(), QueueDriver::Kafka);
2344 assert_eq!(
2345 QueueDriver::from_str("pulsar").unwrap(),
2346 QueueDriver::Pulsar
2347 );
2348 assert_eq!(
2349 QueueDriver::from_str("google-pubsub").unwrap(),
2350 QueueDriver::GooglePubSub
2351 );
2352 }
2353
2354 #[test]
2355 fn delta_coordination_backend_parses_expected_values() {
2356 assert_eq!(
2357 DeltaCoordinationBackend::from_str("auto").unwrap(),
2358 DeltaCoordinationBackend::Auto
2359 );
2360 assert_eq!(
2361 DeltaCoordinationBackend::from_str("redis-cluster").unwrap(),
2362 DeltaCoordinationBackend::RedisCluster
2363 );
2364 assert_eq!(
2365 DeltaCoordinationBackend::from_str("nats").unwrap(),
2366 DeltaCoordinationBackend::Nats
2367 );
2368 }
2369
2370 #[tokio::test]
2371 async fn versioned_messages_driver_overrides_from_env() {
2372 let previous = std::env::var("VERSIONED_MESSAGES_DRIVER").ok();
2373 unsafe { std::env::set_var("VERSIONED_MESSAGES_DRIVER", "postgres") };
2376
2377 let mut options = ServerOptions::default();
2378 options.override_from_env().await.unwrap();
2379
2380 if let Some(previous) = previous {
2381 unsafe { std::env::set_var("VERSIONED_MESSAGES_DRIVER", previous) };
2383 } else {
2384 unsafe { std::env::remove_var("VERSIONED_MESSAGES_DRIVER") };
2386 }
2387
2388 assert_eq!(
2389 options.versioned_messages.driver,
2390 VersionStoreDriver::Postgres
2391 );
2392 }
2393}
2394
2395impl ClusterHealthConfig {
2396 pub fn validate(&self) -> Result<(), String> {
2397 if self.heartbeat_interval_ms == 0 {
2398 return Err("heartbeat_interval_ms must be greater than 0".to_string());
2399 }
2400 if self.node_timeout_ms == 0 {
2401 return Err("node_timeout_ms must be greater than 0".to_string());
2402 }
2403 if self.cleanup_interval_ms == 0 {
2404 return Err("cleanup_interval_ms must be greater than 0".to_string());
2405 }
2406
2407 if self.heartbeat_interval_ms > self.node_timeout_ms / 3 {
2408 return Err(format!(
2409 "heartbeat_interval_ms ({}) should be at least 3x smaller than node_timeout_ms ({}) to avoid false positive dead node detection. Recommended: heartbeat_interval_ms <= {}",
2410 self.heartbeat_interval_ms,
2411 self.node_timeout_ms,
2412 self.node_timeout_ms / 3
2413 ));
2414 }
2415
2416 if self.cleanup_interval_ms > self.node_timeout_ms {
2417 return Err(format!(
2418 "cleanup_interval_ms ({}) should not be larger than node_timeout_ms ({}) to ensure timely dead node detection",
2419 self.cleanup_interval_ms, self.node_timeout_ms
2420 ));
2421 }
2422
2423 Ok(())
2424 }
2425}
2426
2427impl ServerOptions {
2428 pub async fn load_from_file(path: &str) -> Result<Self, Box<dyn std::error::Error>> {
2429 let content = tokio::fs::read_to_string(path).await?;
2430 let options: Self = if path.ends_with(".toml") {
2431 toml::from_str(&content)?
2432 } else {
2433 sonic_rs::from_str(&content)?
2435 };
2436 Ok(options)
2437 }
2438
2439 pub async fn override_from_env(&mut self) -> Result<(), Box<dyn std::error::Error>> {
2440 if let Ok(mode) = std::env::var("ENVIRONMENT") {
2442 self.mode = mode;
2443 }
2444 self.debug = parse_bool_env("DEBUG_MODE", self.debug);
2445 if parse_bool_env("DEBUG", false) {
2446 self.debug = true;
2447 info!("DEBUG environment variable forces debug mode ON");
2448 }
2449
2450 self.activity_timeout = parse_env::<u64>("ACTIVITY_TIMEOUT", self.activity_timeout);
2451
2452 if let Ok(host) = std::env::var("HOST") {
2453 self.host = host;
2454 }
2455 self.port = parse_env::<u16>("PORT", self.port);
2456 self.shutdown_grace_period =
2457 parse_env::<u64>("SHUTDOWN_GRACE_PERIOD", self.shutdown_grace_period);
2458 self.user_authentication_timeout = parse_env::<u64>(
2459 "USER_AUTHENTICATION_TIMEOUT",
2460 self.user_authentication_timeout,
2461 );
2462 self.websocket_max_payload_kb =
2463 parse_env::<u32>("WEBSOCKET_MAX_PAYLOAD_KB", self.websocket_max_payload_kb);
2464 if let Ok(id) = std::env::var("INSTANCE_PROCESS_ID") {
2465 self.instance.process_id = id;
2466 }
2467
2468 if let Ok(driver_str) = std::env::var("ADAPTER_DRIVER") {
2470 self.adapter.driver =
2471 parse_driver_enum(driver_str, self.adapter.driver.clone(), "Adapter");
2472 }
2473 self.adapter.buffer_multiplier_per_cpu = parse_env::<usize>(
2474 "ADAPTER_BUFFER_MULTIPLIER_PER_CPU",
2475 self.adapter.buffer_multiplier_per_cpu,
2476 );
2477 self.adapter.enable_socket_counting = parse_env::<bool>(
2478 "ADAPTER_ENABLE_SOCKET_COUNTING",
2479 self.adapter.enable_socket_counting,
2480 );
2481 self.adapter.fallback_to_local =
2482 parse_env::<bool>("ADAPTER_FALLBACK_TO_LOCAL", self.adapter.fallback_to_local);
2483 if let Ok(driver_str) = std::env::var("CACHE_DRIVER") {
2484 self.cache.driver = parse_driver_enum(driver_str, self.cache.driver.clone(), "Cache");
2485 }
2486 if let Ok(driver_str) = std::env::var("QUEUE_DRIVER") {
2487 self.queue.driver = parse_driver_enum(driver_str, self.queue.driver.clone(), "Queue");
2488 }
2489 if let Ok(driver_str) = std::env::var("APP_MANAGER_DRIVER") {
2490 self.app_manager.driver =
2491 parse_driver_enum(driver_str, self.app_manager.driver.clone(), "AppManager");
2492 }
2493 if let Ok(driver_str) = std::env::var("RATE_LIMITER_DRIVER") {
2494 self.rate_limiter.driver = parse_driver_enum(
2495 driver_str,
2496 self.rate_limiter.driver.clone(),
2497 "RateLimiter Backend",
2498 );
2499 }
2500
2501 if let Ok(host) = std::env::var("DATABASE_REDIS_HOST") {
2503 self.database.redis.host = host;
2504 }
2505 self.database.redis.port =
2506 parse_env::<u16>("DATABASE_REDIS_PORT", self.database.redis.port);
2507 if let Ok(username) = std::env::var("DATABASE_REDIS_USERNAME") {
2508 self.database.redis.username = if username.is_empty() {
2509 None
2510 } else {
2511 Some(username)
2512 };
2513 }
2514 if let Ok(password) = std::env::var("DATABASE_REDIS_PASSWORD") {
2515 self.database.redis.password = Some(password);
2516 }
2517 self.database.redis.db = parse_env::<u32>("DATABASE_REDIS_DB", self.database.redis.db);
2518 if let Ok(prefix) = std::env::var("DATABASE_REDIS_KEY_PREFIX") {
2519 self.database.redis.key_prefix = prefix;
2520 }
2521 if let Ok(cluster_username) = std::env::var("DATABASE_REDIS_CLUSTER_USERNAME") {
2522 self.database.redis.cluster.username = if cluster_username.is_empty() {
2523 None
2524 } else {
2525 Some(cluster_username)
2526 };
2527 }
2528 if let Ok(cluster_password) = std::env::var("DATABASE_REDIS_CLUSTER_PASSWORD") {
2529 self.database.redis.cluster.password = Some(cluster_password);
2530 }
2531 self.database.redis.cluster.use_tls = parse_bool_env(
2532 "DATABASE_REDIS_CLUSTER_USE_TLS",
2533 self.database.redis.cluster.use_tls,
2534 );
2535
2536 if let Ok(host) = std::env::var("DATABASE_MYSQL_HOST") {
2538 self.database.mysql.host = host;
2539 }
2540 self.database.mysql.port =
2541 parse_env::<u16>("DATABASE_MYSQL_PORT", self.database.mysql.port);
2542 if let Ok(user) = std::env::var("DATABASE_MYSQL_USERNAME") {
2543 self.database.mysql.username = user;
2544 }
2545 if let Ok(pass) = std::env::var("DATABASE_MYSQL_PASSWORD") {
2546 self.database.mysql.password = pass;
2547 }
2548 if let Ok(db) = std::env::var("DATABASE_MYSQL_DATABASE") {
2549 self.database.mysql.database = db;
2550 }
2551 if let Ok(table) = std::env::var("DATABASE_MYSQL_TABLE_NAME") {
2552 self.database.mysql.table_name = table;
2553 }
2554 override_db_pool_settings(&mut self.database.mysql, "DATABASE_MYSQL");
2555
2556 if let Ok(host) = std::env::var("DATABASE_POSTGRES_HOST") {
2558 self.database.postgres.host = host;
2559 }
2560 self.database.postgres.port =
2561 parse_env::<u16>("DATABASE_POSTGRES_PORT", self.database.postgres.port);
2562 if let Ok(user) = std::env::var("DATABASE_POSTGRES_USERNAME") {
2563 self.database.postgres.username = user;
2564 }
2565 if let Ok(pass) = std::env::var("DATABASE_POSTGRES_PASSWORD") {
2566 self.database.postgres.password = pass;
2567 }
2568 if let Ok(db) = std::env::var("DATABASE_POSTGRES_DATABASE") {
2569 self.database.postgres.database = db;
2570 }
2571 override_db_pool_settings(&mut self.database.postgres, "DATABASE_POSTGRES");
2572
2573 if let Ok(region) = std::env::var("DATABASE_DYNAMODB_REGION") {
2575 self.database.dynamodb.region = region;
2576 }
2577 if let Ok(table) = std::env::var("DATABASE_DYNAMODB_TABLE_NAME") {
2578 self.database.dynamodb.table_name = table;
2579 }
2580 if let Ok(endpoint) = std::env::var("DATABASE_DYNAMODB_ENDPOINT_URL") {
2581 self.database.dynamodb.endpoint_url = Some(endpoint);
2582 }
2583 if let Ok(key_id) = std::env::var("AWS_ACCESS_KEY_ID") {
2584 self.database.dynamodb.aws_access_key_id = Some(key_id);
2585 }
2586 if let Ok(secret) = std::env::var("AWS_SECRET_ACCESS_KEY") {
2587 self.database.dynamodb.aws_secret_access_key = Some(secret);
2588 }
2589
2590 if let Ok(url) = std::env::var("DATABASE_SURREALDB_URL") {
2592 self.database.surrealdb.url = url;
2593 }
2594 if let Ok(namespace) = std::env::var("DATABASE_SURREALDB_NAMESPACE") {
2595 self.database.surrealdb.namespace = namespace;
2596 }
2597 if let Ok(database) = std::env::var("DATABASE_SURREALDB_DATABASE") {
2598 self.database.surrealdb.database = database;
2599 }
2600 if let Ok(username) = std::env::var("DATABASE_SURREALDB_USERNAME") {
2601 self.database.surrealdb.username = username;
2602 }
2603 if let Ok(password) = std::env::var("DATABASE_SURREALDB_PASSWORD") {
2604 self.database.surrealdb.password = password;
2605 }
2606 if let Ok(table) = std::env::var("DATABASE_SURREALDB_TABLE_NAME") {
2607 self.database.surrealdb.table_name = table;
2608 }
2609
2610 let apply_redis_cluster_nodes = |options: &mut Self, nodes: &str| {
2612 let node_list: Vec<String> = nodes
2613 .split(',')
2614 .map(|s| s.trim())
2615 .filter(|s| !s.is_empty())
2616 .map(ToString::to_string)
2617 .collect();
2618
2619 options.adapter.cluster.nodes = node_list.clone();
2620 options.queue.redis_cluster.nodes = node_list.clone();
2621
2622 let parsed_nodes: Vec<ClusterNode> = node_list
2623 .iter()
2624 .filter_map(|seed| ClusterNode::from_seed(seed))
2625 .collect();
2626 options.database.redis.cluster.nodes = parsed_nodes.clone();
2627 options.database.redis.cluster_nodes = parsed_nodes;
2628 };
2629
2630 if let Ok(nodes) = std::env::var("REDIS_CLUSTER_NODES") {
2631 apply_redis_cluster_nodes(self, &nodes);
2632 }
2633 if let Ok(nodes) = std::env::var("DATABASE_REDIS_CLUSTER_NODES") {
2634 apply_redis_cluster_nodes(self, &nodes);
2635 }
2636 self.queue.redis_cluster.concurrency = parse_env::<u32>(
2637 "REDIS_CLUSTER_QUEUE_CONCURRENCY",
2638 self.queue.redis_cluster.concurrency,
2639 );
2640 if let Ok(prefix) = std::env::var("REDIS_CLUSTER_QUEUE_PREFIX") {
2641 self.queue.redis_cluster.prefix = Some(prefix);
2642 }
2643
2644 self.ssl.enabled = parse_bool_env("SSL_ENABLED", self.ssl.enabled);
2646 if let Ok(val) = std::env::var("SSL_CERT_PATH") {
2647 self.ssl.cert_path = val;
2648 }
2649 if let Ok(val) = std::env::var("SSL_KEY_PATH") {
2650 self.ssl.key_path = val;
2651 }
2652 self.ssl.redirect_http = parse_bool_env("SSL_REDIRECT_HTTP", self.ssl.redirect_http);
2653 if let Some(port) = parse_env_optional::<u16>("SSL_HTTP_PORT") {
2654 self.ssl.http_port = Some(port);
2655 }
2656
2657 self.unix_socket.enabled = parse_bool_env("UNIX_SOCKET_ENABLED", self.unix_socket.enabled);
2659 if let Ok(path) = std::env::var("UNIX_SOCKET_PATH") {
2660 self.unix_socket.path = path;
2661 }
2662 if let Ok(mode_str) = std::env::var("UNIX_SOCKET_PERMISSION_MODE") {
2663 if mode_str.chars().all(|c| c.is_digit(8)) {
2664 if let Ok(mode) = u32::from_str_radix(&mode_str, 8) {
2665 if mode <= 0o777 {
2666 self.unix_socket.permission_mode = mode;
2667 } else {
2668 warn!(
2669 "UNIX_SOCKET_PERMISSION_MODE '{}' exceeds maximum value 777. Using default: {:o}",
2670 mode_str, self.unix_socket.permission_mode
2671 );
2672 }
2673 } else {
2674 warn!(
2675 "Failed to parse UNIX_SOCKET_PERMISSION_MODE '{}' as octal. Using default: {:o}",
2676 mode_str, self.unix_socket.permission_mode
2677 );
2678 }
2679 } else {
2680 warn!(
2681 "UNIX_SOCKET_PERMISSION_MODE '{}' must contain only octal digits (0-7). Using default: {:o}",
2682 mode_str, self.unix_socket.permission_mode
2683 );
2684 }
2685 }
2686
2687 if let Ok(driver_str) = std::env::var("METRICS_DRIVER") {
2689 self.metrics.driver =
2690 parse_driver_enum(driver_str, self.metrics.driver.clone(), "Metrics");
2691 }
2692 self.metrics.enabled = parse_bool_env("METRICS_ENABLED", self.metrics.enabled);
2693 if let Ok(val) = std::env::var("METRICS_HOST") {
2694 self.metrics.host = val;
2695 }
2696 self.metrics.port = parse_env::<u16>("METRICS_PORT", self.metrics.port);
2697 if let Ok(val) = std::env::var("METRICS_PROMETHEUS_PREFIX") {
2698 self.metrics.prometheus.prefix = val;
2699 }
2700
2701 self.http_api.usage_enabled =
2703 parse_bool_env("HTTP_API_USAGE_ENABLED", self.http_api.usage_enabled);
2704
2705 self.rate_limiter.enabled =
2707 parse_bool_env("RATE_LIMITER_ENABLED", self.rate_limiter.enabled);
2708 self.rate_limiter.api_rate_limit.max_requests = parse_env::<u32>(
2709 "RATE_LIMITER_API_MAX_REQUESTS",
2710 self.rate_limiter.api_rate_limit.max_requests,
2711 );
2712 self.rate_limiter.api_rate_limit.window_seconds = parse_env::<u64>(
2713 "RATE_LIMITER_API_WINDOW_SECONDS",
2714 self.rate_limiter.api_rate_limit.window_seconds,
2715 );
2716 if let Some(hops) = parse_env_optional::<u32>("RATE_LIMITER_API_TRUST_HOPS") {
2717 self.rate_limiter.api_rate_limit.trust_hops = Some(hops);
2718 }
2719 self.rate_limiter.websocket_rate_limit.max_requests = parse_env::<u32>(
2720 "RATE_LIMITER_WS_MAX_REQUESTS",
2721 self.rate_limiter.websocket_rate_limit.max_requests,
2722 );
2723 self.rate_limiter.websocket_rate_limit.window_seconds = parse_env::<u64>(
2724 "RATE_LIMITER_WS_WINDOW_SECONDS",
2725 self.rate_limiter.websocket_rate_limit.window_seconds,
2726 );
2727 if let Ok(prefix) = std::env::var("RATE_LIMITER_REDIS_PREFIX") {
2728 self.rate_limiter.redis.prefix = Some(prefix);
2729 }
2730
2731 self.queue.redis.concurrency =
2733 parse_env::<u32>("QUEUE_REDIS_CONCURRENCY", self.queue.redis.concurrency);
2734 if let Ok(prefix) = std::env::var("QUEUE_REDIS_PREFIX") {
2735 self.queue.redis.prefix = Some(prefix);
2736 }
2737
2738 if let Ok(region) = std::env::var("QUEUE_SQS_REGION") {
2740 self.queue.sqs.region = region;
2741 }
2742 self.queue.sqs.visibility_timeout = parse_env::<i32>(
2743 "QUEUE_SQS_VISIBILITY_TIMEOUT",
2744 self.queue.sqs.visibility_timeout,
2745 );
2746 self.queue.sqs.max_messages =
2747 parse_env::<i32>("QUEUE_SQS_MAX_MESSAGES", self.queue.sqs.max_messages);
2748 self.queue.sqs.wait_time_seconds = parse_env::<i32>(
2749 "QUEUE_SQS_WAIT_TIME_SECONDS",
2750 self.queue.sqs.wait_time_seconds,
2751 );
2752 self.queue.sqs.concurrency =
2753 parse_env::<u32>("QUEUE_SQS_CONCURRENCY", self.queue.sqs.concurrency);
2754 self.queue.sqs.fifo = parse_bool_env("QUEUE_SQS_FIFO", self.queue.sqs.fifo);
2755 if let Ok(endpoint) = std::env::var("QUEUE_SQS_ENDPOINT_URL") {
2756 self.queue.sqs.endpoint_url = Some(endpoint);
2757 }
2758
2759 if let Ok(region) = std::env::var("QUEUE_SNS_REGION") {
2761 self.queue.sns.region = region;
2762 }
2763 if let Ok(topic_arn) = std::env::var("QUEUE_SNS_TOPIC_ARN") {
2764 self.queue.sns.topic_arn = topic_arn;
2765 }
2766 if let Ok(endpoint) = std::env::var("QUEUE_SNS_ENDPOINT_URL") {
2767 self.queue.sns.endpoint_url = Some(endpoint);
2768 }
2769
2770 self.webhooks.batching.enabled =
2772 parse_bool_env("WEBHOOK_BATCHING_ENABLED", self.webhooks.batching.enabled);
2773 self.webhooks.batching.duration =
2774 parse_env::<u64>("WEBHOOK_BATCHING_DURATION", self.webhooks.batching.duration);
2775 self.webhooks.batching.size =
2776 parse_env::<usize>("WEBHOOK_BATCHING_SIZE", self.webhooks.batching.size);
2777
2778 if let Ok(servers) = std::env::var("NATS_SERVERS") {
2780 self.adapter.nats.servers = servers.split(',').map(|s| s.trim().to_string()).collect();
2781 }
2782 if let Ok(user) = std::env::var("NATS_USERNAME") {
2783 self.adapter.nats.username = Some(user);
2784 }
2785 if let Ok(pass) = std::env::var("NATS_PASSWORD") {
2786 self.adapter.nats.password = Some(pass);
2787 }
2788 if let Ok(token) = std::env::var("NATS_TOKEN") {
2789 self.adapter.nats.token = Some(token);
2790 }
2791 if let Ok(prefix) = std::env::var("NATS_PREFIX") {
2792 self.adapter.nats.prefix = prefix;
2793 }
2794 self.adapter.nats.connection_timeout_ms = parse_env::<u64>(
2795 "NATS_CONNECTION_TIMEOUT_MS",
2796 self.adapter.nats.connection_timeout_ms,
2797 );
2798 self.adapter.nats.request_timeout_ms = parse_env::<u64>(
2799 "NATS_REQUEST_TIMEOUT_MS",
2800 self.adapter.nats.request_timeout_ms,
2801 );
2802 self.adapter.nats.discovery_max_wait_ms = parse_env::<u64>(
2803 "NATS_DISCOVERY_MAX_WAIT_MS",
2804 self.adapter.nats.discovery_max_wait_ms,
2805 );
2806 self.adapter.nats.discovery_idle_wait_ms = parse_env::<u64>(
2807 "NATS_DISCOVERY_IDLE_WAIT_MS",
2808 self.adapter.nats.discovery_idle_wait_ms,
2809 );
2810 if let Some(nodes) = parse_env_optional::<u32>("NATS_NODES_NUMBER") {
2811 self.adapter.nats.nodes_number = Some(nodes);
2812 }
2813
2814 if let Ok(url) = std::env::var("PULSAR_URL") {
2816 self.adapter.pulsar.url = url;
2817 }
2818 if let Ok(prefix) = std::env::var("PULSAR_PREFIX") {
2819 self.adapter.pulsar.prefix = prefix;
2820 }
2821 if let Ok(token) = std::env::var("PULSAR_TOKEN") {
2822 self.adapter.pulsar.token = Some(token);
2823 }
2824 self.adapter.pulsar.request_timeout_ms = parse_env::<u64>(
2825 "PULSAR_REQUEST_TIMEOUT_MS",
2826 self.adapter.pulsar.request_timeout_ms,
2827 );
2828 if let Some(nodes) = parse_env_optional::<u32>("PULSAR_NODES_NUMBER") {
2829 self.adapter.pulsar.nodes_number = Some(nodes);
2830 }
2831
2832 if let Ok(url) = std::env::var("RABBITMQ_URL") {
2834 self.adapter.rabbitmq.url = url;
2835 }
2836 if let Ok(prefix) = std::env::var("RABBITMQ_PREFIX") {
2837 self.adapter.rabbitmq.prefix = prefix;
2838 }
2839 self.adapter.rabbitmq.connection_timeout_ms = parse_env::<u64>(
2840 "RABBITMQ_CONNECTION_TIMEOUT_MS",
2841 self.adapter.rabbitmq.connection_timeout_ms,
2842 );
2843 self.adapter.rabbitmq.request_timeout_ms = parse_env::<u64>(
2844 "RABBITMQ_REQUEST_TIMEOUT_MS",
2845 self.adapter.rabbitmq.request_timeout_ms,
2846 );
2847 if let Some(nodes) = parse_env_optional::<u32>("RABBITMQ_NODES_NUMBER") {
2848 self.adapter.rabbitmq.nodes_number = Some(nodes);
2849 }
2850
2851 if let Ok(project_id) = std::env::var("GOOGLE_PUBSUB_PROJECT_ID") {
2853 self.adapter.google_pubsub.project_id = project_id;
2854 }
2855 if let Ok(prefix) = std::env::var("GOOGLE_PUBSUB_PREFIX") {
2856 self.adapter.google_pubsub.prefix = prefix;
2857 }
2858 if let Ok(emulator_host) = std::env::var("PUBSUB_EMULATOR_HOST") {
2859 self.adapter.google_pubsub.emulator_host = Some(emulator_host);
2860 }
2861 self.adapter.google_pubsub.request_timeout_ms = parse_env::<u64>(
2862 "GOOGLE_PUBSUB_REQUEST_TIMEOUT_MS",
2863 self.adapter.google_pubsub.request_timeout_ms,
2864 );
2865 if let Some(nodes) = parse_env_optional::<u32>("GOOGLE_PUBSUB_NODES_NUMBER") {
2866 self.adapter.google_pubsub.nodes_number = Some(nodes);
2867 }
2868
2869 if let Ok(brokers) = std::env::var("KAFKA_BROKERS") {
2871 self.adapter.kafka.brokers = brokers.split(',').map(|s| s.trim().to_string()).collect();
2872 }
2873 if let Ok(prefix) = std::env::var("KAFKA_PREFIX") {
2874 self.adapter.kafka.prefix = prefix;
2875 }
2876 if let Ok(protocol) = std::env::var("KAFKA_SECURITY_PROTOCOL") {
2877 self.adapter.kafka.security_protocol = Some(protocol);
2878 }
2879 if let Ok(mechanism) = std::env::var("KAFKA_SASL_MECHANISM") {
2880 self.adapter.kafka.sasl_mechanism = Some(mechanism);
2881 }
2882 if let Ok(username) = std::env::var("KAFKA_SASL_USERNAME") {
2883 self.adapter.kafka.sasl_username = Some(username);
2884 }
2885 if let Ok(password) = std::env::var("KAFKA_SASL_PASSWORD") {
2886 self.adapter.kafka.sasl_password = Some(password);
2887 }
2888 self.adapter.kafka.request_timeout_ms = parse_env::<u64>(
2889 "KAFKA_REQUEST_TIMEOUT_MS",
2890 self.adapter.kafka.request_timeout_ms,
2891 );
2892 if let Some(nodes) = parse_env_optional::<u32>("KAFKA_NODES_NUMBER") {
2893 self.adapter.kafka.nodes_number = Some(nodes);
2894 }
2895
2896 if let Ok(connection_string) = std::env::var("IGGY_CONNECTION_STRING") {
2898 self.adapter.iggy.connection_string = connection_string.clone();
2899 self.queue.iggy.connection_string = connection_string;
2900 }
2901 if let Ok(username) = std::env::var("IGGY_USERNAME") {
2902 let username = (!username.is_empty()).then_some(username);
2903 self.adapter.iggy.username = username.clone();
2904 self.queue.iggy.username = username;
2905 }
2906 if let Ok(password) = std::env::var("IGGY_PASSWORD") {
2907 let password = (!password.is_empty()).then_some(password);
2908 self.adapter.iggy.password = password.clone();
2909 self.queue.iggy.password = password;
2910 }
2911 if let Ok(consumer_name) = std::env::var("IGGY_CONSUMER_NAME") {
2912 let consumer_name = (!consumer_name.is_empty()).then_some(consumer_name);
2913 self.adapter.iggy.consumer_name = consumer_name.clone();
2914 self.queue.iggy.consumer_name = consumer_name;
2915 } else if let Ok(process_id) = std::env::var("INSTANCE_PROCESS_ID") {
2916 let process_id = (!process_id.is_empty()).then_some(process_id);
2917 self.adapter.iggy.consumer_name = process_id.clone();
2918 self.queue.iggy.consumer_name = process_id;
2919 }
2920 if let Ok(stream) = std::env::var("IGGY_STREAM") {
2921 self.adapter.iggy.stream = stream.clone();
2922 self.queue.iggy.stream = stream;
2923 }
2924 if let Ok(prefix) = std::env::var("IGGY_TOPIC_PREFIX") {
2925 self.adapter.iggy.topic_prefix = prefix;
2926 }
2927 if let Ok(prefix) = std::env::var("IGGY_QUEUE_TOPIC_PREFIX") {
2928 self.queue.iggy.queue_topic_prefix = prefix;
2929 }
2930 if let Ok(prefix) = std::env::var("IGGY_CONSUMER_GROUP_PREFIX") {
2931 self.queue.iggy.consumer_group_prefix = prefix;
2932 }
2933 self.adapter.iggy.request_timeout_ms = parse_env::<u64>(
2934 "IGGY_REQUEST_TIMEOUT_MS",
2935 self.adapter.iggy.request_timeout_ms,
2936 );
2937 self.queue.iggy.request_timeout_ms = parse_env::<u64>(
2938 "IGGY_REQUEST_TIMEOUT_MS",
2939 self.queue.iggy.request_timeout_ms,
2940 );
2941 self.adapter.iggy.poll_interval_ms =
2942 parse_env::<u64>("IGGY_POLL_INTERVAL_MS", self.adapter.iggy.poll_interval_ms);
2943 self.queue.iggy.poll_interval_ms =
2944 parse_env::<u64>("IGGY_POLL_INTERVAL_MS", self.queue.iggy.poll_interval_ms);
2945 self.adapter.iggy.poll_batch_size =
2946 parse_env::<u32>("IGGY_POLL_BATCH_SIZE", self.adapter.iggy.poll_batch_size);
2947 self.queue.iggy.poll_batch_size =
2948 parse_env::<u32>("IGGY_POLL_BATCH_SIZE", self.queue.iggy.poll_batch_size);
2949 self.adapter.iggy.partitions_count = parse_env::<u32>(
2950 "ADAPTER_IGGY_PARTITIONS_COUNT",
2951 parse_env::<u32>("IGGY_PARTITIONS_COUNT", self.adapter.iggy.partitions_count),
2952 );
2953 self.queue.iggy.partitions_count = parse_env::<u32>(
2954 "QUEUE_IGGY_PARTITIONS_COUNT",
2955 parse_env::<u32>("IGGY_PARTITIONS_COUNT", self.queue.iggy.partitions_count),
2956 );
2957 self.adapter.iggy.partition_id = parse_env::<u32>(
2958 "ADAPTER_IGGY_PARTITION_ID",
2959 parse_env::<u32>("IGGY_PARTITION_ID", self.adapter.iggy.partition_id),
2960 );
2961 self.queue.iggy.partition_id = parse_env::<u32>(
2962 "QUEUE_IGGY_PARTITION_ID",
2963 parse_env::<u32>("IGGY_PARTITION_ID", self.queue.iggy.partition_id),
2964 );
2965 self.adapter.iggy.auto_create =
2966 parse_env::<bool>("IGGY_AUTO_CREATE", self.adapter.iggy.auto_create);
2967 self.queue.iggy.auto_create =
2968 parse_env::<bool>("IGGY_AUTO_CREATE", self.queue.iggy.auto_create);
2969 self.adapter.iggy.start_from_latest = parse_env::<bool>(
2970 "IGGY_START_FROM_LATEST",
2971 self.adapter.iggy.start_from_latest,
2972 );
2973 if let Some(nodes) = parse_env_optional::<u32>("IGGY_NODES_NUMBER") {
2974 self.adapter.iggy.nodes_number = Some(nodes);
2975 self.queue.iggy.nodes_number = Some(nodes);
2976 }
2977
2978 if let Ok(origins) = std::env::var("CORS_ORIGINS") {
2980 let parsed: Vec<String> = origins.split(',').map(|s| s.trim().to_string()).collect();
2981 if let Err(e) = crate::origin_validation::OriginValidator::validate_patterns(&parsed) {
2982 warn!(
2983 "CORS_ORIGINS contains invalid patterns: {}. Keeping previous CORS origins.",
2984 e
2985 );
2986 } else {
2987 self.cors.origin = parsed;
2988 }
2989 }
2990 if let Ok(methods) = std::env::var("CORS_METHODS") {
2991 self.cors.methods = methods.split(',').map(|s| s.trim().to_string()).collect();
2992 }
2993 if let Ok(headers) = std::env::var("CORS_HEADERS") {
2994 self.cors.allowed_headers = headers.split(',').map(|s| s.trim().to_string()).collect();
2995 }
2996 self.cors.credentials = parse_bool_env("CORS_CREDENTIALS", self.cors.credentials);
2997
2998 self.database_pooling.enabled =
3000 parse_bool_env("DATABASE_POOLING_ENABLED", self.database_pooling.enabled);
3001 if let Some(min) = parse_env_optional::<u32>("DATABASE_POOL_MIN") {
3002 self.database_pooling.min = min;
3003 }
3004 if let Some(max) = parse_env_optional::<u32>("DATABASE_POOL_MAX") {
3005 self.database_pooling.max = max;
3006 }
3007
3008 if let Some(pool_size) = parse_env_optional::<u32>("DATABASE_CONNECTION_POOL_SIZE") {
3009 self.database.mysql.connection_pool_size = pool_size;
3010 self.database.postgres.connection_pool_size = pool_size;
3011 }
3012 if let Some(cache_ttl) = parse_env_optional::<u64>("CACHE_TTL_SECONDS") {
3013 self.app_manager.cache.ttl = cache_ttl;
3014 self.channel_limits.cache_ttl = cache_ttl;
3015 self.database.mysql.cache_ttl = cache_ttl;
3016 self.database.postgres.cache_ttl = cache_ttl;
3017 self.database.surrealdb.cache_ttl = cache_ttl;
3018 self.cache.memory.ttl = cache_ttl;
3019 }
3020 if let Some(cleanup_interval) = parse_env_optional::<u64>("CACHE_CLEANUP_INTERVAL") {
3021 self.database.mysql.cache_cleanup_interval = cleanup_interval;
3022 self.database.postgres.cache_cleanup_interval = cleanup_interval;
3023 self.cache.memory.cleanup_interval = cleanup_interval;
3024 }
3025 if let Some(max_capacity) = parse_env_optional::<u64>("CACHE_MAX_CAPACITY") {
3026 self.database.mysql.cache_max_capacity = max_capacity;
3027 self.database.postgres.cache_max_capacity = max_capacity;
3028 self.database.surrealdb.cache_max_capacity = max_capacity;
3029 self.cache.memory.max_capacity = max_capacity;
3030 }
3031
3032 let default_app_id = std::env::var("SOCKUDO_DEFAULT_APP_ID");
3033 let default_app_key = std::env::var("SOCKUDO_DEFAULT_APP_KEY");
3034 let default_app_secret = std::env::var("SOCKUDO_DEFAULT_APP_SECRET");
3035 let default_app_enabled = parse_bool_env("SOCKUDO_DEFAULT_APP_ENABLED", true);
3036
3037 if let (Ok(app_id), Ok(app_key), Ok(app_secret)) =
3038 (default_app_id, default_app_key, default_app_secret)
3039 && default_app_enabled
3040 {
3041 let default_app = App::from_policy(
3042 app_id,
3043 app_key,
3044 app_secret,
3045 default_app_enabled,
3046 crate::app::AppPolicy {
3047 limits: crate::app::AppLimitsPolicy {
3048 max_connections: parse_env::<u32>(
3049 "SOCKUDO_DEFAULT_APP_MAX_CONNECTIONS",
3050 100,
3051 ),
3052 max_backend_events_per_second: Some(parse_env::<u32>(
3053 "SOCKUDO_DEFAULT_APP_MAX_BACKEND_EVENTS_PER_SECOND",
3054 100,
3055 )),
3056 max_client_events_per_second: parse_env::<u32>(
3057 "SOCKUDO_DEFAULT_APP_MAX_CLIENT_EVENTS_PER_SECOND",
3058 100,
3059 ),
3060 max_read_requests_per_second: Some(parse_env::<u32>(
3061 "SOCKUDO_DEFAULT_APP_MAX_READ_REQUESTS_PER_SECOND",
3062 100,
3063 )),
3064 max_presence_members_per_channel: Some(parse_env::<u32>(
3065 "SOCKUDO_DEFAULT_APP_MAX_PRESENCE_MEMBERS_PER_CHANNEL",
3066 100,
3067 )),
3068 max_presence_member_size_in_kb: Some(parse_env::<u32>(
3069 "SOCKUDO_DEFAULT_APP_MAX_PRESENCE_MEMBER_SIZE_IN_KB",
3070 100,
3071 )),
3072 max_channel_name_length: Some(parse_env::<u32>(
3073 "SOCKUDO_DEFAULT_APP_MAX_CHANNEL_NAME_LENGTH",
3074 100,
3075 )),
3076 max_event_channels_at_once: Some(parse_env::<u32>(
3077 "SOCKUDO_DEFAULT_APP_MAX_EVENT_CHANNELS_AT_ONCE",
3078 100,
3079 )),
3080 max_event_name_length: Some(parse_env::<u32>(
3081 "SOCKUDO_DEFAULT_APP_MAX_EVENT_NAME_LENGTH",
3082 100,
3083 )),
3084 max_event_payload_in_kb: Some(parse_env::<u32>(
3085 "SOCKUDO_DEFAULT_APP_MAX_EVENT_PAYLOAD_IN_KB",
3086 100,
3087 )),
3088 max_event_batch_size: Some(parse_env::<u32>(
3089 "SOCKUDO_DEFAULT_APP_MAX_EVENT_BATCH_SIZE",
3090 100,
3091 )),
3092 decay_seconds: None,
3093 terminate_on_limit: false,
3094 message_rate_limit: None,
3095 },
3096 features: crate::app::AppFeaturesPolicy {
3097 enable_client_messages: std::env::var(
3098 "SOCKUDO_DEFAULT_APP_ENABLE_CLIENT_MESSAGES",
3099 )
3100 .ok()
3101 .map(|value| {
3102 matches!(
3103 value.trim().to_ascii_lowercase().as_str(),
3104 "true" | "1" | "yes" | "on"
3105 )
3106 })
3107 .unwrap_or_else(|| parse_bool_env("SOCKUDO_ENABLE_CLIENT_MESSAGES", false)),
3108 enable_user_authentication: Some(parse_bool_env(
3109 "SOCKUDO_DEFAULT_APP_ENABLE_USER_AUTHENTICATION",
3110 false,
3111 )),
3112 enable_watchlist_events: Some(parse_bool_env(
3113 "SOCKUDO_DEFAULT_APP_ENABLE_WATCHLIST_EVENTS",
3114 false,
3115 )),
3116 },
3117 channels: crate::app::AppChannelsPolicy {
3118 allowed_origins: {
3119 if let Ok(origins_str) =
3120 std::env::var("SOCKUDO_DEFAULT_APP_ALLOWED_ORIGINS")
3121 {
3122 if !origins_str.is_empty() {
3123 Some(
3124 origins_str
3125 .split(',')
3126 .map(|s| s.trim().to_string())
3127 .collect(),
3128 )
3129 } else {
3130 None
3131 }
3132 } else {
3133 None
3134 }
3135 },
3136 annotations_enabled: Some(parse_bool_env(
3137 "SOCKUDO_DEFAULT_APP_ANNOTATIONS_ENABLED",
3138 false,
3139 )),
3140 channel_delta_compression: None,
3141 channel_namespaces: None,
3142 },
3143 webhooks: None,
3144 idempotency: None,
3145 connection_recovery: None,
3146 history: None,
3147 presence_history: None,
3148 },
3149 );
3150
3151 self.app_manager.array.apps.push(default_app);
3152 info!("Successfully registered default app from env");
3153 }
3154
3155 if let Ok(redis_url_env) = std::env::var("REDIS_URL") {
3157 info!("Applying REDIS_URL environment variable override");
3158
3159 let redis_url_json = sonic_rs::json!(redis_url_env);
3160
3161 self.adapter
3162 .redis
3163 .redis_pub_options
3164 .insert("url".to_string(), redis_url_json.clone());
3165 self.adapter
3166 .redis
3167 .redis_sub_options
3168 .insert("url".to_string(), redis_url_json);
3169
3170 self.cache.redis.url_override = Some(redis_url_env.clone());
3171 self.queue.redis.url_override = Some(redis_url_env.clone());
3172 self.rate_limiter.redis.url_override = Some(redis_url_env);
3173 }
3174
3175 let has_colors_env = std::env::var("LOG_COLORS_ENABLED").is_ok();
3177 let has_target_env = std::env::var("LOG_INCLUDE_TARGET").is_ok();
3178 if has_colors_env || has_target_env {
3179 let logging_config = self.logging.get_or_insert_with(Default::default);
3180 if has_colors_env {
3181 logging_config.colors_enabled =
3182 parse_bool_env("LOG_COLORS_ENABLED", logging_config.colors_enabled);
3183 }
3184 if has_target_env {
3185 logging_config.include_target =
3186 parse_bool_env("LOG_INCLUDE_TARGET", logging_config.include_target);
3187 }
3188 }
3189
3190 self.cleanup.async_enabled =
3192 parse_bool_env("CLEANUP_ASYNC_ENABLED", self.cleanup.async_enabled);
3193 self.cleanup.fallback_to_sync =
3194 parse_bool_env("CLEANUP_FALLBACK_TO_SYNC", self.cleanup.fallback_to_sync);
3195 self.cleanup.queue_buffer_size =
3196 parse_env::<usize>("CLEANUP_QUEUE_BUFFER_SIZE", self.cleanup.queue_buffer_size);
3197 self.cleanup.batch_size = parse_env::<usize>("CLEANUP_BATCH_SIZE", self.cleanup.batch_size);
3198 self.cleanup.batch_timeout_ms =
3199 parse_env::<u64>("CLEANUP_BATCH_TIMEOUT_MS", self.cleanup.batch_timeout_ms);
3200 if let Ok(worker_threads_str) = std::env::var("CLEANUP_WORKER_THREADS") {
3201 self.cleanup.worker_threads = if worker_threads_str.to_lowercase() == "auto" {
3202 WorkerThreadsConfig::Auto
3203 } else if let Ok(n) = worker_threads_str.parse::<usize>() {
3204 WorkerThreadsConfig::Fixed(n)
3205 } else {
3206 warn!(
3207 "Invalid CLEANUP_WORKER_THREADS value '{}', keeping current setting",
3208 worker_threads_str
3209 );
3210 self.cleanup.worker_threads.clone()
3211 };
3212 }
3213 self.cleanup.max_retry_attempts = parse_env::<u32>(
3214 "CLEANUP_MAX_RETRY_ATTEMPTS",
3215 self.cleanup.max_retry_attempts,
3216 );
3217
3218 self.cluster_health.enabled =
3220 parse_bool_env("CLUSTER_HEALTH_ENABLED", self.cluster_health.enabled);
3221 self.cluster_health.heartbeat_interval_ms = parse_env::<u64>(
3222 "CLUSTER_HEALTH_HEARTBEAT_INTERVAL",
3223 self.cluster_health.heartbeat_interval_ms,
3224 );
3225 self.cluster_health.node_timeout_ms = parse_env::<u64>(
3226 "CLUSTER_HEALTH_NODE_TIMEOUT",
3227 self.cluster_health.node_timeout_ms,
3228 );
3229 self.cluster_health.cleanup_interval_ms = parse_env::<u64>(
3230 "CLUSTER_HEALTH_CLEANUP_INTERVAL",
3231 self.cluster_health.cleanup_interval_ms,
3232 );
3233
3234 self.tag_filtering.enabled =
3236 parse_bool_env("TAG_FILTERING_ENABLED", self.tag_filtering.enabled);
3237
3238 if let Ok(val) = std::env::var("WEBSOCKET_MAX_MESSAGES") {
3240 if val.to_lowercase() == "none" || val == "0" {
3241 self.websocket.max_messages = None;
3242 } else if let Ok(n) = val.parse::<usize>() {
3243 self.websocket.max_messages = Some(n);
3244 }
3245 }
3246 if let Ok(val) = std::env::var("WEBSOCKET_MAX_BYTES") {
3247 if val.to_lowercase() == "none" || val == "0" {
3248 self.websocket.max_bytes = None;
3249 } else if let Ok(n) = val.parse::<usize>() {
3250 self.websocket.max_bytes = Some(n);
3251 }
3252 }
3253 self.websocket.disconnect_on_buffer_full = parse_bool_env(
3254 "WEBSOCKET_DISCONNECT_ON_BUFFER_FULL",
3255 self.websocket.disconnect_on_buffer_full,
3256 );
3257 self.websocket.max_message_size = parse_env::<usize>(
3258 "WEBSOCKET_MAX_MESSAGE_SIZE",
3259 self.websocket.max_message_size,
3260 );
3261 self.websocket.max_frame_size =
3262 parse_env::<usize>("WEBSOCKET_MAX_FRAME_SIZE", self.websocket.max_frame_size);
3263 self.websocket.write_buffer_size = parse_env::<usize>(
3264 "WEBSOCKET_WRITE_BUFFER_SIZE",
3265 self.websocket.write_buffer_size,
3266 );
3267 self.websocket.max_backpressure = parse_env::<usize>(
3268 "WEBSOCKET_MAX_BACKPRESSURE",
3269 self.websocket.max_backpressure,
3270 );
3271 self.websocket.auto_ping = parse_bool_env("WEBSOCKET_AUTO_PING", self.websocket.auto_ping);
3272 self.websocket.ping_interval =
3273 parse_env::<u32>("WEBSOCKET_PING_INTERVAL", self.websocket.ping_interval);
3274 self.websocket.idle_timeout =
3275 parse_env::<u32>("WEBSOCKET_IDLE_TIMEOUT", self.websocket.idle_timeout);
3276 if let Ok(mode) = std::env::var("WEBSOCKET_COMPRESSION") {
3277 self.websocket.compression = mode;
3278 }
3279
3280 self.connection_recovery.enabled = parse_bool_env(
3282 "CONNECTION_RECOVERY_ENABLED",
3283 self.connection_recovery.enabled,
3284 );
3285 self.connection_recovery.buffer_ttl_seconds = parse_env::<u64>(
3286 "CONNECTION_RECOVERY_BUFFER_TTL",
3287 self.connection_recovery.buffer_ttl_seconds,
3288 );
3289 self.connection_recovery.max_buffer_size = parse_env::<usize>(
3290 "CONNECTION_RECOVERY_MAX_BUFFER_SIZE",
3291 self.connection_recovery.max_buffer_size,
3292 );
3293
3294 self.history.enabled = parse_bool_env("HISTORY_ENABLED", self.history.enabled);
3295 self.history.rewind_enabled =
3296 parse_bool_env("HISTORY_REWIND_ENABLED", self.history.rewind_enabled);
3297 self.history.retention_window_seconds = parse_env::<u64>(
3298 "HISTORY_RETENTION_WINDOW_SECONDS",
3299 self.history.retention_window_seconds,
3300 );
3301 self.history.max_page_size =
3302 parse_env::<usize>("HISTORY_MAX_PAGE_SIZE", self.history.max_page_size);
3303 self.history.writer_shards =
3304 parse_env::<usize>("HISTORY_WRITER_SHARDS", self.history.writer_shards);
3305 self.history.writer_queue_capacity = parse_env::<usize>(
3306 "HISTORY_WRITER_QUEUE_CAPACITY",
3307 self.history.writer_queue_capacity,
3308 );
3309 if let Ok(backend) = std::env::var("HISTORY_BACKEND") {
3310 self.history.backend = HistoryBackend::from_str(&backend)?;
3311 }
3312 if let Ok(max_messages) = std::env::var("HISTORY_MAX_MESSAGES_PER_CHANNEL") {
3313 self.history.max_messages_per_channel = Some(
3314 max_messages
3315 .parse::<usize>()
3316 .map_err(|e| format!("Invalid HISTORY_MAX_MESSAGES_PER_CHANNEL: {e}"))?,
3317 );
3318 }
3319 if let Ok(max_bytes) = std::env::var("HISTORY_MAX_BYTES_PER_CHANNEL") {
3320 self.history.max_bytes_per_channel = Some(
3321 max_bytes
3322 .parse::<u64>()
3323 .map_err(|e| format!("Invalid HISTORY_MAX_BYTES_PER_CHANNEL: {e}"))?,
3324 );
3325 }
3326 if let Ok(table_prefix) = std::env::var("HISTORY_POSTGRES_TABLE_PREFIX") {
3327 self.history.postgres.table_prefix = table_prefix;
3328 }
3329 self.history.postgres.write_timeout_ms = parse_env::<u64>(
3330 "HISTORY_POSTGRES_WRITE_TIMEOUT_MS",
3331 self.history.postgres.write_timeout_ms,
3332 );
3333 self.history.purge_interval_seconds = parse_env::<u64>(
3334 "HISTORY_PURGE_INTERVAL_SECONDS",
3335 self.history.purge_interval_seconds,
3336 );
3337 self.history.purge_batch_size =
3338 parse_env::<usize>("HISTORY_PURGE_BATCH_SIZE", self.history.purge_batch_size);
3339 self.history.max_purge_per_tick = parse_env::<usize>(
3340 "HISTORY_MAX_PURGE_PER_TICK",
3341 self.history.max_purge_per_tick,
3342 );
3343
3344 self.presence_history.enabled =
3345 parse_bool_env("PRESENCE_HISTORY_ENABLED", self.presence_history.enabled);
3346 self.presence_history.retention_window_seconds = parse_env::<u64>(
3347 "PRESENCE_HISTORY_RETENTION_WINDOW_SECONDS",
3348 self.presence_history.retention_window_seconds,
3349 );
3350 self.presence_history.max_page_size = parse_env::<usize>(
3351 "PRESENCE_HISTORY_MAX_PAGE_SIZE",
3352 self.presence_history.max_page_size,
3353 );
3354 if let Ok(max_events) = std::env::var("PRESENCE_HISTORY_MAX_EVENTS_PER_CHANNEL") {
3355 self.presence_history.max_events_per_channel =
3356 Some(max_events.parse::<usize>().map_err(|e| {
3357 format!("Invalid PRESENCE_HISTORY_MAX_EVENTS_PER_CHANNEL: {e}")
3358 })?);
3359 }
3360 if let Ok(max_bytes) = std::env::var("PRESENCE_HISTORY_MAX_BYTES_PER_CHANNEL") {
3361 self.presence_history.max_bytes_per_channel = Some(
3362 max_bytes
3363 .parse::<u64>()
3364 .map_err(|e| format!("Invalid PRESENCE_HISTORY_MAX_BYTES_PER_CHANNEL: {e}"))?,
3365 );
3366 }
3367 self.idempotency.enabled = parse_bool_env("IDEMPOTENCY_ENABLED", self.idempotency.enabled);
3368 self.idempotency.ttl_seconds =
3369 parse_env::<u64>("IDEMPOTENCY_TTL_SECONDS", self.idempotency.ttl_seconds);
3370 self.idempotency.max_key_length = parse_env::<usize>(
3371 "IDEMPOTENCY_MAX_KEY_LENGTH",
3372 self.idempotency.max_key_length,
3373 );
3374
3375 self.ephemeral.enabled = parse_bool_env("EPHEMERAL_ENABLED", self.ephemeral.enabled);
3376
3377 self.echo_control.enabled =
3378 parse_bool_env("ECHO_CONTROL_ENABLED", self.echo_control.enabled);
3379 self.echo_control.default_echo_messages = parse_bool_env(
3380 "ECHO_CONTROL_DEFAULT_ECHO_MESSAGES",
3381 self.echo_control.default_echo_messages,
3382 );
3383
3384 self.event_name_filtering.enabled = parse_bool_env(
3385 "EVENT_NAME_FILTERING_ENABLED",
3386 self.event_name_filtering.enabled,
3387 );
3388 self.event_name_filtering.max_events_per_filter = parse_env::<usize>(
3389 "EVENT_NAME_FILTERING_MAX_EVENTS_PER_FILTER",
3390 self.event_name_filtering.max_events_per_filter,
3391 );
3392 self.event_name_filtering.max_event_name_length = parse_env::<usize>(
3393 "EVENT_NAME_FILTERING_MAX_EVENT_NAME_LENGTH",
3394 self.event_name_filtering.max_event_name_length,
3395 );
3396 self.versioned_messages.enabled = parse_bool_env(
3397 "VERSIONED_MESSAGES_ENABLED",
3398 self.versioned_messages.enabled,
3399 );
3400 if let Ok(driver_str) = std::env::var("VERSIONED_MESSAGES_DRIVER") {
3401 self.versioned_messages.driver = parse_driver_enum(
3402 driver_str,
3403 self.versioned_messages.driver.clone(),
3404 "VersionedMessages Backend",
3405 );
3406 }
3407 self.versioned_messages.max_page_size = parse_env::<usize>(
3408 "VERSIONED_MESSAGES_MAX_PAGE_SIZE",
3409 self.versioned_messages.max_page_size,
3410 );
3411 self.versioned_messages.retention_window_seconds = parse_env::<u64>(
3412 "VERSIONED_MESSAGES_RETENTION_WINDOW_SECONDS",
3413 self.versioned_messages.retention_window_seconds,
3414 );
3415 self.versioned_messages.purge_interval_seconds = parse_env::<u64>(
3416 "VERSIONED_MESSAGES_PURGE_INTERVAL_SECONDS",
3417 self.versioned_messages.purge_interval_seconds,
3418 );
3419 self.versioned_messages.purge_batch_size = parse_env::<usize>(
3420 "VERSIONED_MESSAGES_PURGE_BATCH_SIZE",
3421 self.versioned_messages.purge_batch_size,
3422 );
3423 self.versioned_messages.max_purge_per_tick = parse_env::<usize>(
3424 "VERSIONED_MESSAGES_MAX_PURGE_PER_TICK",
3425 self.versioned_messages.max_purge_per_tick,
3426 );
3427 self.annotations.enabled = parse_bool_env("ANNOTATIONS_ENABLED", self.annotations.enabled);
3428
3429 Ok(())
3430 }
3431
3432 pub fn validate(&self) -> Result<(), String> {
3433 if self.unix_socket.enabled {
3434 if self.unix_socket.path.is_empty() {
3435 return Err(
3436 "Unix socket path cannot be empty when Unix socket is enabled".to_string(),
3437 );
3438 }
3439
3440 self.validate_unix_socket_security()?;
3441
3442 if self.ssl.enabled {
3443 tracing::warn!(
3444 "Both Unix socket and SSL are enabled. This is unusual as Unix sockets are typically used behind reverse proxies that handle SSL termination."
3445 );
3446 }
3447
3448 if self.unix_socket.permission_mode > 0o777 {
3449 return Err(format!(
3450 "Unix socket permission_mode ({:o}) is invalid. Must be a valid octal mode (0o000 to 0o777)",
3451 self.unix_socket.permission_mode
3452 ));
3453 }
3454 }
3455
3456 if let Err(e) = self.cleanup.validate() {
3457 return Err(format!("Invalid cleanup configuration: {}", e));
3458 }
3459
3460 if self.history.enabled {
3461 if self.history.max_page_size == 0 {
3462 return Err("history.max_page_size must be greater than 0".to_string());
3463 }
3464 if self.history.writer_shards == 0 {
3465 return Err("history.writer_shards must be greater than 0".to_string());
3466 }
3467 if self.history.writer_queue_capacity == 0 {
3468 return Err("history.writer_queue_capacity must be greater than 0".to_string());
3469 }
3470 if self.history.retention_window_seconds == 0 {
3471 return Err("history.retention_window_seconds must be greater than 0".to_string());
3472 }
3473 if self.history.postgres.table_prefix.trim().is_empty() {
3474 return Err("history.postgres.table_prefix must not be empty".to_string());
3475 }
3476 }
3477
3478 if self.presence_history.enabled {
3479 if self.presence_history.max_page_size == 0 {
3480 return Err("presence_history.max_page_size must be greater than 0".to_string());
3481 }
3482 if self.presence_history.retention_window_seconds == 0 {
3483 return Err(
3484 "presence_history.retention_window_seconds must be greater than 0".to_string(),
3485 );
3486 }
3487 }
3488
3489 if self.versioned_messages.enabled && self.versioned_messages.max_page_size == 0 {
3490 return Err("versioned_messages.max_page_size must be greater than 0".to_string());
3491 }
3492 if self.annotations.enabled && !self.versioned_messages.enabled {
3493 return Err("annotations require versioned_messages.enabled".to_string());
3494 }
3495
3496 Ok(())
3497 }
3498
3499 fn validate_unix_socket_security(&self) -> Result<(), String> {
3500 let path = &self.unix_socket.path;
3501
3502 if path.contains("../") || path.contains("..\\") {
3503 return Err(
3504 "Unix socket path contains directory traversal sequences (../). This is not allowed for security reasons.".to_string()
3505 );
3506 }
3507
3508 if self.unix_socket.permission_mode & 0o002 != 0 {
3509 warn!(
3510 "Unix socket permission mode ({:o}) allows world write access. This may be a security risk. Consider using more restrictive permissions like 0o660 or 0o750.",
3511 self.unix_socket.permission_mode
3512 );
3513 }
3514
3515 if self.unix_socket.permission_mode & 0o007 > 0o005 {
3516 warn!(
3517 "Unix socket permission mode ({:o}) grants write permissions to others. Consider using more restrictive permissions.",
3518 self.unix_socket.permission_mode
3519 );
3520 }
3521
3522 if self.mode == "production" && path.starts_with("/tmp/") {
3523 warn!(
3524 "Unix socket path '{}' is in /tmp directory. In production, consider using a more permanent location like /var/run/sockudo/ for better security and persistence.",
3525 path
3526 );
3527 }
3528
3529 if !path.starts_with('/') {
3530 return Err(
3531 "Unix socket path must be absolute (start with /) for security and reliability."
3532 .to_string(),
3533 );
3534 }
3535
3536 Ok(())
3537 }
3538}
3539
3540#[cfg(test)]
3541mod redis_connection_tests {
3542 use super::{ClusterNode, RedisClusterConnection, RedisConnection, RedisSentinel};
3543
3544 #[test]
3545 fn test_standard_url_basic() {
3546 let conn = RedisConnection {
3547 host: "127.0.0.1".to_string(),
3548 port: 6379,
3549 db: 0,
3550 username: None,
3551 password: None,
3552 key_prefix: "sockudo:".to_string(),
3553 sentinels: Vec::new(),
3554 sentinel_password: None,
3555 name: "mymaster".to_string(),
3556 cluster: RedisClusterConnection::default(),
3557 cluster_nodes: Vec::new(),
3558 };
3559 assert_eq!(conn.to_url(), "redis://127.0.0.1:6379/0");
3560 }
3561
3562 #[test]
3563 fn test_standard_url_with_password() {
3564 let conn = RedisConnection {
3565 host: "127.0.0.1".to_string(),
3566 port: 6379,
3567 db: 2,
3568 username: None,
3569 password: Some("secret".to_string()),
3570 key_prefix: "sockudo:".to_string(),
3571 sentinels: Vec::new(),
3572 sentinel_password: None,
3573 name: "mymaster".to_string(),
3574 cluster: RedisClusterConnection::default(),
3575 cluster_nodes: Vec::new(),
3576 };
3577 assert_eq!(conn.to_url(), "redis://:secret@127.0.0.1:6379/2");
3578 }
3579
3580 #[test]
3581 fn test_standard_url_with_username_and_password() {
3582 let conn = RedisConnection {
3583 host: "redis.example.com".to_string(),
3584 port: 6380,
3585 db: 1,
3586 username: Some("admin".to_string()),
3587 password: Some("pass123".to_string()),
3588 key_prefix: "sockudo:".to_string(),
3589 sentinels: Vec::new(),
3590 sentinel_password: None,
3591 name: "mymaster".to_string(),
3592 cluster: RedisClusterConnection::default(),
3593 cluster_nodes: Vec::new(),
3594 };
3595 assert_eq!(
3596 conn.to_url(),
3597 "redis://admin:pass123@redis.example.com:6380/1"
3598 );
3599 }
3600
3601 #[test]
3602 fn test_standard_url_with_special_chars_in_password() {
3603 let conn = RedisConnection {
3604 host: "127.0.0.1".to_string(),
3605 port: 6379,
3606 db: 0,
3607 username: None,
3608 password: Some("pass@word#123".to_string()),
3609 key_prefix: "sockudo:".to_string(),
3610 sentinels: Vec::new(),
3611 sentinel_password: None,
3612 name: "mymaster".to_string(),
3613 cluster: RedisClusterConnection::default(),
3614 cluster_nodes: Vec::new(),
3615 };
3616 assert_eq!(conn.to_url(), "redis://:pass%40word%23123@127.0.0.1:6379/0");
3617 }
3618
3619 #[test]
3620 fn test_is_sentinel_configured_false() {
3621 let conn = RedisConnection::default();
3622 assert!(!conn.is_sentinel_configured());
3623 }
3624
3625 #[test]
3626 fn test_is_sentinel_configured_true() {
3627 let conn = RedisConnection {
3628 sentinels: vec![RedisSentinel {
3629 host: "sentinel1".to_string(),
3630 port: 26379,
3631 }],
3632 ..Default::default()
3633 };
3634 assert!(conn.is_sentinel_configured());
3635 }
3636
3637 #[test]
3638 fn test_sentinel_url_basic() {
3639 let conn = RedisConnection {
3640 host: "127.0.0.1".to_string(),
3641 port: 6379,
3642 db: 0,
3643 username: None,
3644 password: None,
3645 key_prefix: "sockudo:".to_string(),
3646 sentinels: vec![
3647 RedisSentinel {
3648 host: "sentinel1".to_string(),
3649 port: 26379,
3650 },
3651 RedisSentinel {
3652 host: "sentinel2".to_string(),
3653 port: 26379,
3654 },
3655 ],
3656 sentinel_password: None,
3657 name: "mymaster".to_string(),
3658 cluster: RedisClusterConnection::default(),
3659 cluster_nodes: Vec::new(),
3660 };
3661 assert_eq!(
3662 conn.to_url(),
3663 "redis+sentinel://sentinel1:26379,sentinel2:26379/mymaster/0"
3664 );
3665 }
3666
3667 #[test]
3668 fn test_sentinel_url_with_sentinel_password() {
3669 let conn = RedisConnection {
3670 host: "127.0.0.1".to_string(),
3671 port: 6379,
3672 db: 0,
3673 username: None,
3674 password: None,
3675 key_prefix: "sockudo:".to_string(),
3676 sentinels: vec![RedisSentinel {
3677 host: "sentinel1".to_string(),
3678 port: 26379,
3679 }],
3680 sentinel_password: Some("sentinelpass".to_string()),
3681 name: "mymaster".to_string(),
3682 cluster: RedisClusterConnection::default(),
3683 cluster_nodes: Vec::new(),
3684 };
3685 assert_eq!(
3686 conn.to_url(),
3687 "redis+sentinel://:sentinelpass@sentinel1:26379/mymaster/0"
3688 );
3689 }
3690
3691 #[test]
3692 fn test_sentinel_url_with_master_password() {
3693 let conn = RedisConnection {
3694 host: "127.0.0.1".to_string(),
3695 port: 6379,
3696 db: 1,
3697 username: None,
3698 password: Some("masterpass".to_string()),
3699 key_prefix: "sockudo:".to_string(),
3700 sentinels: vec![RedisSentinel {
3701 host: "sentinel1".to_string(),
3702 port: 26379,
3703 }],
3704 sentinel_password: None,
3705 name: "mymaster".to_string(),
3706 cluster: RedisClusterConnection::default(),
3707 cluster_nodes: Vec::new(),
3708 };
3709 assert_eq!(
3710 conn.to_url(),
3711 "redis+sentinel://sentinel1:26379/mymaster/1?password=masterpass"
3712 );
3713 }
3714
3715 #[test]
3716 fn test_sentinel_url_with_all_auth() {
3717 let conn = RedisConnection {
3718 host: "127.0.0.1".to_string(),
3719 port: 6379,
3720 db: 2,
3721 username: Some("redisuser".to_string()),
3722 password: Some("redispass".to_string()),
3723 key_prefix: "sockudo:".to_string(),
3724 sentinels: vec![
3725 RedisSentinel {
3726 host: "sentinel1".to_string(),
3727 port: 26379,
3728 },
3729 RedisSentinel {
3730 host: "sentinel2".to_string(),
3731 port: 26380,
3732 },
3733 ],
3734 sentinel_password: Some("sentinelauth".to_string()),
3735 name: "production-master".to_string(),
3736 cluster: RedisClusterConnection::default(),
3737 cluster_nodes: Vec::new(),
3738 };
3739 assert_eq!(
3740 conn.to_url(),
3741 "redis+sentinel://:sentinelauth@sentinel1:26379,sentinel2:26380/production-master/2?password=redispass&username=redisuser"
3742 );
3743 }
3744
3745 #[test]
3746 fn test_sentinel_to_host_port() {
3747 let sentinel = RedisSentinel {
3748 host: "sentinel.example.com".to_string(),
3749 port: 26379,
3750 };
3751 assert_eq!(sentinel.to_host_port(), "sentinel.example.com:26379");
3752 }
3753
3754 #[test]
3755 fn test_cluster_node_urls_with_shared_cluster_auth_and_tls() {
3756 let conn = RedisConnection {
3757 cluster: RedisClusterConnection {
3758 nodes: vec![
3759 ClusterNode {
3760 host: "node1.secure-cluster.com".to_string(),
3761 port: 7000,
3762 },
3763 ClusterNode {
3764 host: "redis://node2.secure-cluster.com:7001".to_string(),
3765 port: 7001,
3766 },
3767 ClusterNode {
3768 host: "rediss://node3.secure-cluster.com".to_string(),
3769 port: 7002,
3770 },
3771 ],
3772 username: None,
3773 password: Some("cluster-secret".to_string()),
3774 use_tls: true,
3775 },
3776 ..Default::default()
3777 };
3778
3779 assert_eq!(
3780 conn.cluster_node_urls(),
3781 vec![
3782 "rediss://:cluster-secret@node1.secure-cluster.com:7000",
3783 "redis://:cluster-secret@node2.secure-cluster.com:7001",
3784 "rediss://:cluster-secret@node3.secure-cluster.com:7002",
3785 ]
3786 );
3787 }
3788
3789 #[test]
3790 fn test_cluster_node_urls_fallback_to_legacy_nodes() {
3791 let conn = RedisConnection {
3792 password: Some("fallback-secret".to_string()),
3793 cluster_nodes: vec![ClusterNode {
3794 host: "legacy-node.example.com".to_string(),
3795 port: 7000,
3796 }],
3797 ..Default::default()
3798 };
3799
3800 assert_eq!(
3801 conn.cluster_node_urls(),
3802 vec!["redis://:fallback-secret@legacy-node.example.com:7000"]
3803 );
3804 }
3805
3806 #[test]
3807 fn test_normalize_cluster_seed_urls() {
3808 let conn = RedisConnection {
3809 cluster: RedisClusterConnection {
3810 nodes: Vec::new(),
3811 username: Some("svc-user".to_string()),
3812 password: Some("svc-pass".to_string()),
3813 use_tls: true,
3814 },
3815 ..Default::default()
3816 };
3817
3818 let seeds = vec![
3819 "node1.example.com:7000".to_string(),
3820 "redis://node2.example.com:7001".to_string(),
3821 "rediss://node3.example.com".to_string(),
3822 ];
3823
3824 assert_eq!(
3825 conn.normalize_cluster_seed_urls(&seeds),
3826 vec![
3827 "rediss://svc-user:svc-pass@node1.example.com:7000",
3828 "redis://svc-user:svc-pass@node2.example.com:7001",
3829 "rediss://svc-user:svc-pass@node3.example.com:6379",
3830 ]
3831 );
3832 }
3833}
3834
3835#[cfg(test)]
3836mod cluster_node_tests {
3837 use super::ClusterNode;
3838
3839 #[test]
3840 fn test_to_url_basic_host() {
3841 let node = ClusterNode {
3842 host: "localhost".to_string(),
3843 port: 6379,
3844 };
3845 assert_eq!(node.to_url(), "redis://localhost:6379");
3846 }
3847
3848 #[test]
3849 fn test_to_url_ip_address() {
3850 let node = ClusterNode {
3851 host: "127.0.0.1".to_string(),
3852 port: 6379,
3853 };
3854 assert_eq!(node.to_url(), "redis://127.0.0.1:6379");
3855 }
3856
3857 #[test]
3858 fn test_to_url_with_redis_protocol() {
3859 let node = ClusterNode {
3860 host: "redis://example.com".to_string(),
3861 port: 6379,
3862 };
3863 assert_eq!(node.to_url(), "redis://example.com:6379");
3864 }
3865
3866 #[test]
3867 fn test_to_url_with_rediss_protocol() {
3868 let node = ClusterNode {
3869 host: "rediss://secure.example.com".to_string(),
3870 port: 6379,
3871 };
3872 assert_eq!(node.to_url(), "rediss://secure.example.com:6379");
3873 }
3874
3875 #[test]
3876 fn test_to_url_with_rediss_protocol_and_port_in_url() {
3877 let node = ClusterNode {
3878 host: "rediss://secure.example.com:7000".to_string(),
3879 port: 6379,
3880 };
3881 assert_eq!(node.to_url(), "rediss://secure.example.com:7000");
3882 }
3883
3884 #[test]
3885 fn test_to_url_with_redis_protocol_and_port_in_url() {
3886 let node = ClusterNode {
3887 host: "redis://example.com:7001".to_string(),
3888 port: 6379,
3889 };
3890 assert_eq!(node.to_url(), "redis://example.com:7001");
3891 }
3892
3893 #[test]
3894 fn test_to_url_with_trailing_whitespace() {
3895 let node = ClusterNode {
3896 host: " rediss://secure.example.com ".to_string(),
3897 port: 6379,
3898 };
3899 assert_eq!(node.to_url(), "rediss://secure.example.com:6379");
3900 }
3901
3902 #[test]
3903 fn test_to_url_custom_port() {
3904 let node = ClusterNode {
3905 host: "redis-cluster.example.com".to_string(),
3906 port: 7000,
3907 };
3908 assert_eq!(node.to_url(), "redis://redis-cluster.example.com:7000");
3909 }
3910
3911 #[test]
3912 fn test_to_url_plain_host_with_port_in_host_field() {
3913 let node = ClusterNode {
3914 host: "redis-cluster.example.com:7010".to_string(),
3915 port: 7000,
3916 };
3917 assert_eq!(node.to_url(), "redis://redis-cluster.example.com:7010");
3918 }
3919
3920 #[test]
3921 fn test_to_url_with_options_adds_auth_and_tls() {
3922 let node = ClusterNode {
3923 host: "node.example.com".to_string(),
3924 port: 7000,
3925 };
3926 assert_eq!(
3927 node.to_url_with_options(true, Some("svc-user"), Some("secret")),
3928 "rediss://svc-user:secret@node.example.com:7000"
3929 );
3930 }
3931
3932 #[test]
3933 fn test_to_url_with_options_keeps_embedded_auth() {
3934 let node = ClusterNode {
3935 host: "rediss://:node-secret@node.example.com:7000".to_string(),
3936 port: 7000,
3937 };
3938 assert_eq!(
3939 node.to_url_with_options(true, Some("global-user"), Some("global-secret")),
3940 "rediss://:node-secret@node.example.com:7000"
3941 );
3942 }
3943
3944 #[test]
3945 fn test_from_seed_parses_plain_host_port() {
3946 let node = ClusterNode::from_seed("cluster-node-1:7005").expect("node should parse");
3947 assert_eq!(node.host, "cluster-node-1");
3948 assert_eq!(node.port, 7005);
3949 }
3950
3951 #[test]
3952 fn test_from_seed_keeps_scheme_urls() {
3953 let node =
3954 ClusterNode::from_seed("rediss://secure.example.com:7005").expect("node should parse");
3955 assert_eq!(node.host, "rediss://secure.example.com:7005");
3956 assert_eq!(node.port, 7005);
3957 }
3958
3959 #[test]
3960 fn test_to_url_aws_elasticache_hostname() {
3961 let node = ClusterNode {
3962 host: "rediss://my-cluster.use1.cache.amazonaws.com".to_string(),
3963 port: 6379,
3964 };
3965 assert_eq!(
3966 node.to_url(),
3967 "rediss://my-cluster.use1.cache.amazonaws.com:6379"
3968 );
3969 }
3970
3971 #[test]
3972 fn test_to_url_with_ipv6_no_port() {
3973 let node = ClusterNode {
3974 host: "rediss://[::1]".to_string(),
3975 port: 6379,
3976 };
3977 assert_eq!(node.to_url(), "rediss://[::1]:6379");
3978 }
3979
3980 #[test]
3981 fn test_to_url_with_ipv6_and_port_in_url() {
3982 let node = ClusterNode {
3983 host: "rediss://[::1]:7000".to_string(),
3984 port: 6379,
3985 };
3986 assert_eq!(node.to_url(), "rediss://[::1]:7000");
3987 }
3988
3989 #[test]
3990 fn test_to_url_with_ipv6_full_address_no_port() {
3991 let node = ClusterNode {
3992 host: "rediss://[2001:db8::1]".to_string(),
3993 port: 6379,
3994 };
3995 assert_eq!(node.to_url(), "rediss://[2001:db8::1]:6379");
3996 }
3997
3998 #[test]
3999 fn test_to_url_with_ipv6_full_address_with_port() {
4000 let node = ClusterNode {
4001 host: "rediss://[2001:db8::1]:7000".to_string(),
4002 port: 6379,
4003 };
4004 assert_eq!(node.to_url(), "rediss://[2001:db8::1]:7000");
4005 }
4006
4007 #[test]
4008 fn test_to_url_with_redis_protocol_ipv6() {
4009 let node = ClusterNode {
4010 host: "redis://[::1]".to_string(),
4011 port: 6379,
4012 };
4013 assert_eq!(node.to_url(), "redis://[::1]:6379");
4014 }
4015}
4016
4017#[cfg(test)]
4018mod cors_config_tests {
4019 use super::CorsConfig;
4020
4021 fn cors_from_json(json: &str) -> sonic_rs::Result<CorsConfig> {
4022 sonic_rs::from_str(json)
4023 }
4024
4025 #[test]
4026 fn test_deserialize_valid_exact_origins() {
4027 let config =
4028 cors_from_json(r#"{"origin": ["https://example.com", "https://other.com"]}"#).unwrap();
4029 assert_eq!(config.origin.len(), 2);
4030 }
4031
4032 #[test]
4033 fn test_deserialize_valid_wildcard_patterns() {
4034 let config =
4035 cors_from_json(r#"{"origin": ["*.example.com", "https://*.staging.example.com"]}"#)
4036 .unwrap();
4037 assert_eq!(config.origin.len(), 2);
4038 }
4039
4040 #[test]
4041 fn test_deserialize_allows_special_markers() {
4042 assert!(cors_from_json(r#"{"origin": ["*"]}"#).is_ok());
4043 assert!(cors_from_json(r#"{"origin": ["Any"]}"#).is_ok());
4044 assert!(cors_from_json(r#"{"origin": ["any"]}"#).is_ok());
4045 assert!(cors_from_json(r#"{"origin": ["*", "https://example.com"]}"#).is_ok());
4046 }
4047
4048 #[test]
4049 fn test_deserialize_rejects_invalid_patterns() {
4050 assert!(cors_from_json(r#"{"origin": ["*.*bad"]}"#).is_err());
4051 assert!(cors_from_json(r#"{"origin": ["*."]}"#).is_err());
4052 assert!(cors_from_json(r#"{"origin": [""]}"#).is_err());
4053 assert!(cors_from_json(r#"{"origin": ["https://"]}"#).is_err());
4054 }
4055
4056 #[test]
4057 fn test_deserialize_rejects_mixed_valid_and_invalid() {
4058 assert!(cors_from_json(r#"{"origin": ["https://good.com", "*.*bad"]}"#).is_err());
4059 }
4060}