1use std::path::PathBuf;
2
3use serde::{Deserialize, Serialize};
4
5use crate::storage::partition::PartitionGranularity;
6
7#[derive(Debug, Clone, Serialize, Deserialize)]
9pub struct ArchiverConfig {
10 #[serde(default = "default_listen_addr")]
11 pub listen_addr: String,
12 #[serde(default = "default_listen_port")]
13 pub listen_port: u16,
14 pub storage: StorageConfig,
15 #[serde(default)]
16 pub engine: EngineConfig,
17
18 #[serde(default)]
19 pub cluster: Option<ClusterConfig>,
20 #[serde(default)]
24 pub failover: Option<FailoverConfig>,
25 #[serde(default)]
28 pub pva: Option<PvaConfig>,
29 #[serde(default)]
33 pub api_keys: Option<Vec<String>>,
34 #[serde(default)]
36 pub security: SecurityConfig,
37 #[serde(default)]
39 pub tls: Option<TlsConfig>,
40}
41
42fn default_listen_addr() -> String {
43 "0.0.0.0".to_string()
44}
45
46fn default_listen_port() -> u16 {
47 17665
48}
49
50#[derive(Debug, Clone, Serialize, Deserialize)]
52pub struct StorageConfig {
53 pub sts: TierConfig,
54 pub mts: TierConfig,
55 pub lts: TierConfig,
56 #[serde(default)]
64 pub max_open_writers_total: Option<usize>,
65}
66
67#[derive(Debug, Clone, Serialize, Deserialize)]
68pub struct TierConfig {
69 pub root_folder: PathBuf,
70 pub partition_granularity: PartitionGranularity,
71 #[serde(default = "default_hold")]
73 pub hold: u32,
74 #[serde(default = "default_gather")]
76 pub gather: u32,
77 #[serde(default)]
84 pub max_open_writers: Option<usize>,
85}
86
87fn default_hold() -> u32 {
88 5
89}
90
91fn default_gather() -> u32 {
92 3
93}
94
95#[derive(Debug, Clone, Default, Serialize, Deserialize)]
97pub struct EngineConfig {
98 #[serde(default = "default_write_period")]
100 pub write_period_secs: u64,
101 pub policy_file: Option<PathBuf>,
103 #[serde(default = "default_server_ioc_drift_secs")]
109 pub server_ioc_drift_secs: u64,
110 #[serde(default = "default_write_shards")]
117 pub write_shards: usize,
118 #[serde(default = "default_per_shard_buffer")]
124 pub per_shard_buffer: usize,
125}
126
127fn default_write_period() -> u64 {
128 10
129}
130
131fn default_server_ioc_drift_secs() -> u64 {
132 30 * 60
133}
134
135fn default_write_shards() -> usize {
136 1
137}
138
139fn default_per_shard_buffer() -> usize {
140 4096
141}
142
143#[derive(Debug, Clone, Serialize, Deserialize)]
145pub struct SecurityConfig {
146 #[serde(default)]
148 pub cors_origins: Vec<String>,
149 #[serde(default = "default_rate_limit_rps")]
151 pub rate_limit_rps: u32,
152 #[serde(default = "default_rate_limit_burst")]
154 pub rate_limit_burst: u32,
155 #[serde(default = "default_max_body_size")]
157 pub max_body_size: usize,
158 #[serde(default)]
162 pub trust_proxy_headers: bool,
163}
164
165impl Default for SecurityConfig {
166 fn default() -> Self {
167 Self {
168 cors_origins: Vec::new(),
169 rate_limit_rps: default_rate_limit_rps(),
170 rate_limit_burst: default_rate_limit_burst(),
171 max_body_size: default_max_body_size(),
172 trust_proxy_headers: false,
173 }
174 }
175}
176
177fn default_rate_limit_rps() -> u32 {
178 100
179}
180
181fn default_rate_limit_burst() -> u32 {
182 200
183}
184
185fn default_max_body_size() -> usize {
186 10 * 1024 * 1024 }
188
189#[derive(Debug, Clone, Serialize, Deserialize)]
191pub struct TlsConfig {
192 pub cert_path: PathBuf,
193 pub key_path: PathBuf,
194}
195
196#[derive(Debug, Clone, Serialize, Deserialize)]
198pub struct ApplianceIdentity {
199 pub name: String,
200 pub mgmt_url: String,
201 pub retrieval_url: String,
202 pub engine_url: String,
203 pub etl_url: String,
204}
205
206#[derive(Debug, Clone, Serialize, Deserialize)]
208pub struct PeerConfig {
209 pub name: String,
210 pub mgmt_url: String,
211 pub retrieval_url: String,
212 #[serde(default)]
215 pub api_key: Option<String>,
216}
217
218#[derive(Debug, Clone, Serialize, Deserialize)]
220pub struct ClusterConfig {
221 pub identity: ApplianceIdentity,
222 #[serde(default = "default_cache_ttl")]
223 pub cache_ttl_secs: u64,
224 #[serde(default = "default_peer_timeout")]
225 pub peer_timeout_secs: u64,
226 #[serde(default)]
227 pub peers: Vec<PeerConfig>,
228 #[serde(default)]
232 pub api_key: Option<String>,
233 #[serde(default)]
239 pub reassign_appliance_enabled: bool,
240}
241
242fn default_cache_ttl() -> u64 {
243 300
244}
245
246fn default_peer_timeout() -> u64 {
247 30
248}
249
250#[derive(Debug, Clone, Serialize, Deserialize)]
258pub struct FailoverConfig {
259 pub peers: Vec<String>,
262 #[serde(default = "default_failover_timeout")]
264 pub timeout_secs: u64,
265}
266
267fn default_failover_timeout() -> u64 {
268 30
269}
270
271#[derive(Debug, Clone, Serialize, Deserialize)]
273pub struct PvaConfig {
274 #[serde(default = "default_pva_tcp_port")]
276 pub tcp_port: u16,
277 #[serde(default = "default_pva_udp_port")]
279 pub udp_port: u16,
280}
281
282fn default_pva_tcp_port() -> u16 {
283 5075
284}
285
286fn default_pva_udp_port() -> u16 {
287 5076
288}
289
290impl Default for PvaConfig {
291 fn default() -> Self {
292 Self {
293 tcp_port: default_pva_tcp_port(),
294 udp_port: default_pva_udp_port(),
295 }
296 }
297}
298
299impl ArchiverConfig {
300 pub fn from_toml(s: &str) -> Result<Self, toml::de::Error> {
301 toml::from_str(s)
302 }
303
304 pub fn validate(&self) -> anyhow::Result<()> {
306 if self.engine.write_period_secs == 0 {
307 anyhow::bail!("engine.write_period_secs must be > 0");
314 }
315 if self.engine.write_shards == 0 {
316 anyhow::bail!("engine.write_shards must be > 0 (use 1 for the legacy single-worker layout)");
317 }
318 if self.engine.per_shard_buffer == 0 && self.engine.write_shards > 1 {
319 anyhow::bail!(
320 "engine.per_shard_buffer must be > 0 when write_shards > 1; \
321 a 0-capacity shard channel would drop every sample"
322 );
323 }
324 for (name, tier) in [
325 ("sts", &self.storage.sts),
326 ("mts", &self.storage.mts),
327 ("lts", &self.storage.lts),
328 ] {
329 if tier.gather >= tier.hold {
330 anyhow::bail!(
331 "{name}: gather ({}) must be less than hold ({})",
332 tier.gather,
333 tier.hold,
334 );
335 }
336 }
337 if let Some(ref cluster) = self.cluster {
338 if cluster.peer_timeout_secs == 0 {
339 anyhow::bail!("cluster.peer_timeout_secs must be > 0");
340 }
341 if cluster.cache_ttl_secs == 0 {
342 anyhow::bail!("cluster.cache_ttl_secs must be > 0");
343 }
344 if self.api_keys.is_some() && !cluster.peers.is_empty() {
347 let has_fallback = cluster.api_key.is_some();
348 for (i, peer) in cluster.peers.iter().enumerate() {
349 if peer.api_key.is_none() && !has_fallback {
350 anyhow::bail!(
351 "cluster.peers[{i}] ({}) has no api_key and no cluster.api_key fallback; \
352 proxied write requests to this peer will be rejected",
353 peer.name
354 );
355 }
356 }
357 }
358 for (i, peer) in cluster.peers.iter().enumerate() {
359 if !peer.mgmt_url.starts_with("http://") && !peer.mgmt_url.starts_with("https://") {
360 anyhow::bail!(
361 "cluster.peers[{i}].mgmt_url must start with http:// or https://"
362 );
363 }
364 if !peer.retrieval_url.starts_with("http://")
365 && !peer.retrieval_url.starts_with("https://")
366 {
367 anyhow::bail!(
368 "cluster.peers[{i}].retrieval_url must start with http:// or https://"
369 );
370 }
371 }
372 }
373 Ok(())
374 }
375}
376
377#[cfg(test)]
378mod tests {
379 use super::*;
380
381 #[test]
382 fn parse_config_without_cluster() {
383 let toml = r#"
384[storage.sts]
385root_folder = "/tmp/sts"
386partition_granularity = "hour"
387
388[storage.mts]
389root_folder = "/tmp/mts"
390partition_granularity = "day"
391
392[storage.lts]
393root_folder = "/tmp/lts"
394partition_granularity = "year"
395"#;
396 let config = ArchiverConfig::from_toml(toml).unwrap();
397 assert!(config.cluster.is_none());
398 }
399
400 #[test]
401 fn parse_config_with_cluster() {
402 let toml = r#"
403[storage.sts]
404root_folder = "/tmp/sts"
405partition_granularity = "hour"
406
407[storage.mts]
408root_folder = "/tmp/mts"
409partition_granularity = "day"
410
411[storage.lts]
412root_folder = "/tmp/lts"
413partition_granularity = "year"
414
415[cluster.identity]
416name = "appliance0"
417mgmt_url = "http://host0:17665/mgmt/bpl"
418retrieval_url = "http://host0:17665/retrieval"
419engine_url = "http://host0:17665"
420etl_url = "http://host0:17665"
421
422[[cluster.peers]]
423name = "appliance1"
424mgmt_url = "http://host1:17665/mgmt/bpl"
425retrieval_url = "http://host1:17665/retrieval"
426"#;
427 let config = ArchiverConfig::from_toml(toml).unwrap();
428 let cluster = config.cluster.unwrap();
429 assert_eq!(cluster.identity.name, "appliance0");
430 assert_eq!(cluster.peers.len(), 1);
431 assert_eq!(cluster.peers[0].name, "appliance1");
432 assert_eq!(cluster.cache_ttl_secs, 300);
433 assert_eq!(cluster.peer_timeout_secs, 30);
434 }
435
436 #[test]
437 fn validate_cluster_api_key_required_with_api_keys() {
438 let toml = r#"
439api_keys = ["secret"]
440
441[storage.sts]
442root_folder = "/tmp/sts"
443partition_granularity = "hour"
444
445[storage.mts]
446root_folder = "/tmp/mts"
447partition_granularity = "day"
448
449[storage.lts]
450root_folder = "/tmp/lts"
451partition_granularity = "year"
452
453[cluster.identity]
454name = "appliance0"
455mgmt_url = "http://host0:17665/mgmt/bpl"
456retrieval_url = "http://host0:17665/retrieval"
457engine_url = "http://host0:17665"
458etl_url = "http://host0:17665"
459
460[[cluster.peers]]
461name = "appliance1"
462mgmt_url = "http://host1:17665/mgmt/bpl"
463retrieval_url = "http://host1:17665/retrieval"
464"#;
465 let config = ArchiverConfig::from_toml(toml).unwrap();
466 let err = config.validate().unwrap_err();
467 assert!(
468 err.to_string()
469 .contains("has no api_key and no cluster.api_key fallback")
470 );
471 }
472
473 #[test]
474 fn validate_cluster_api_key_not_required_without_api_keys() {
475 let toml = r#"
476[storage.sts]
477root_folder = "/tmp/sts"
478partition_granularity = "hour"
479
480[storage.mts]
481root_folder = "/tmp/mts"
482partition_granularity = "day"
483
484[storage.lts]
485root_folder = "/tmp/lts"
486partition_granularity = "year"
487
488[cluster.identity]
489name = "appliance0"
490mgmt_url = "http://host0:17665/mgmt/bpl"
491retrieval_url = "http://host0:17665/retrieval"
492engine_url = "http://host0:17665"
493etl_url = "http://host0:17665"
494
495[[cluster.peers]]
496name = "appliance1"
497mgmt_url = "http://host1:17665/mgmt/bpl"
498retrieval_url = "http://host1:17665/retrieval"
499"#;
500 let config = ArchiverConfig::from_toml(toml).unwrap();
501 config.validate().unwrap(); }
503
504 #[test]
505 fn validate_per_peer_keys_without_fallback() {
506 let toml = r#"
508api_keys = ["secret"]
509
510[storage.sts]
511root_folder = "/tmp/sts"
512partition_granularity = "hour"
513
514[storage.mts]
515root_folder = "/tmp/mts"
516partition_granularity = "day"
517
518[storage.lts]
519root_folder = "/tmp/lts"
520partition_granularity = "year"
521
522[cluster.identity]
523name = "appliance0"
524mgmt_url = "http://host0:17665/mgmt/bpl"
525retrieval_url = "http://host0:17665/retrieval"
526engine_url = "http://host0:17665"
527etl_url = "http://host0:17665"
528
529[[cluster.peers]]
530name = "appliance1"
531mgmt_url = "http://host1:17665/mgmt/bpl"
532retrieval_url = "http://host1:17665/retrieval"
533api_key = "peer1-key"
534"#;
535 let config = ArchiverConfig::from_toml(toml).unwrap();
536 config.validate().unwrap();
537 }
538
539 #[test]
540 fn validate_mixed_per_peer_and_fallback() {
541 let toml = r#"
543api_keys = ["secret"]
544
545[storage.sts]
546root_folder = "/tmp/sts"
547partition_granularity = "hour"
548
549[storage.mts]
550root_folder = "/tmp/mts"
551partition_granularity = "day"
552
553[storage.lts]
554root_folder = "/tmp/lts"
555partition_granularity = "year"
556
557[cluster]
558api_key = "shared-fallback"
559
560[cluster.identity]
561name = "appliance0"
562mgmt_url = "http://host0:17665/mgmt/bpl"
563retrieval_url = "http://host0:17665/retrieval"
564engine_url = "http://host0:17665"
565etl_url = "http://host0:17665"
566
567[[cluster.peers]]
568name = "appliance1"
569mgmt_url = "http://host1:17665/mgmt/bpl"
570retrieval_url = "http://host1:17665/retrieval"
571api_key = "peer1-specific"
572
573[[cluster.peers]]
574name = "appliance2"
575mgmt_url = "http://host2:17665/mgmt/bpl"
576retrieval_url = "http://host2:17665/retrieval"
577"#;
578 let config = ArchiverConfig::from_toml(toml).unwrap();
579 config.validate().unwrap();
580 }
581
582 #[test]
583 fn parse_peer_api_key_from_toml() {
584 let toml = r#"
585[storage.sts]
586root_folder = "/tmp/sts"
587partition_granularity = "hour"
588
589[storage.mts]
590root_folder = "/tmp/mts"
591partition_granularity = "day"
592
593[storage.lts]
594root_folder = "/tmp/lts"
595partition_granularity = "year"
596
597[cluster.identity]
598name = "appliance0"
599mgmt_url = "http://host0:17665/mgmt/bpl"
600retrieval_url = "http://host0:17665/retrieval"
601engine_url = "http://host0:17665"
602etl_url = "http://host0:17665"
603
604[[cluster.peers]]
605name = "appliance1"
606mgmt_url = "http://host1:17665/mgmt/bpl"
607retrieval_url = "http://host1:17665/retrieval"
608api_key = "peer1-secret"
609
610[[cluster.peers]]
611name = "appliance2"
612mgmt_url = "http://host2:17665/mgmt/bpl"
613retrieval_url = "http://host2:17665/retrieval"
614"#;
615 let config = ArchiverConfig::from_toml(toml).unwrap();
616 let cluster = config.cluster.unwrap();
617 assert_eq!(cluster.peers[0].api_key.as_deref(), Some("peer1-secret"));
618 assert_eq!(cluster.peers[1].api_key, None);
619 }
620}