1use std::path::PathBuf;
2
3use serde::{Deserialize, Serialize};
4
5use crate::storage::partition::PartitionGranularity;
6
7#[derive(Debug, Clone, Serialize, Deserialize)]
9pub struct ArchiverConfig {
10 #[serde(default = "default_listen_addr")]
11 pub listen_addr: String,
12 #[serde(default = "default_listen_port")]
13 pub listen_port: u16,
14 pub storage: StorageConfig,
15 #[serde(default)]
16 pub engine: EngineConfig,
17
18 #[serde(default)]
19 pub cluster: Option<ClusterConfig>,
20 #[serde(default)]
24 pub failover: Option<FailoverConfig>,
25 #[serde(default)]
28 pub pva: Option<PvaConfig>,
29 #[serde(default)]
33 pub api_keys: Option<Vec<String>>,
34 #[serde(default)]
36 pub security: SecurityConfig,
37 #[serde(default)]
39 pub tls: Option<TlsConfig>,
40}
41
42fn default_listen_addr() -> String {
43 "0.0.0.0".to_string()
44}
45
46fn default_listen_port() -> u16 {
47 17665
48}
49
50#[derive(Debug, Clone, Serialize, Deserialize)]
52pub struct StorageConfig {
53 pub sts: TierConfig,
54 pub mts: TierConfig,
55 pub lts: TierConfig,
56 #[serde(default)]
64 pub max_open_writers_total: Option<usize>,
65}
66
67#[derive(Debug, Clone, Serialize, Deserialize)]
68pub struct TierConfig {
69 pub root_folder: PathBuf,
70 pub partition_granularity: PartitionGranularity,
71 #[serde(default = "default_hold")]
73 pub hold: u32,
74 #[serde(default = "default_gather")]
76 pub gather: u32,
77 #[serde(default)]
84 pub max_open_writers: Option<usize>,
85}
86
87fn default_hold() -> u32 {
88 5
89}
90
91fn default_gather() -> u32 {
92 3
93}
94
95#[derive(Debug, Clone, Serialize, Deserialize)]
97pub struct EngineConfig {
98 #[serde(default = "default_write_period")]
100 pub write_period_secs: u64,
101 pub policy_file: Option<PathBuf>,
103 #[serde(default = "default_server_ioc_drift_secs")]
109 pub server_ioc_drift_secs: u64,
110 #[serde(default = "default_write_shards")]
117 pub write_shards: usize,
118 #[serde(default = "default_per_shard_buffer")]
125 pub per_shard_buffer: usize,
126}
127
128fn default_write_period() -> u64 {
129 10
130}
131
132fn default_server_ioc_drift_secs() -> u64 {
133 30 * 60
134}
135
136fn default_write_shards() -> usize {
137 1
138}
139
140fn default_per_shard_buffer() -> usize {
141 4096
142}
143
144impl Default for EngineConfig {
145 fn default() -> Self {
153 Self {
154 write_period_secs: default_write_period(),
155 policy_file: None,
156 server_ioc_drift_secs: default_server_ioc_drift_secs(),
157 write_shards: default_write_shards(),
158 per_shard_buffer: default_per_shard_buffer(),
159 }
160 }
161}
162
163#[derive(Debug, Clone, Serialize, Deserialize)]
165pub struct SecurityConfig {
166 #[serde(default)]
168 pub cors_origins: Vec<String>,
169 #[serde(default = "default_rate_limit_rps")]
171 pub rate_limit_rps: u32,
172 #[serde(default = "default_rate_limit_burst")]
174 pub rate_limit_burst: u32,
175 #[serde(default = "default_max_body_size")]
177 pub max_body_size: usize,
178 #[serde(default)]
182 pub trust_proxy_headers: bool,
183}
184
185impl Default for SecurityConfig {
186 fn default() -> Self {
187 Self {
188 cors_origins: Vec::new(),
189 rate_limit_rps: default_rate_limit_rps(),
190 rate_limit_burst: default_rate_limit_burst(),
191 max_body_size: default_max_body_size(),
192 trust_proxy_headers: false,
193 }
194 }
195}
196
197fn default_rate_limit_rps() -> u32 {
198 100
199}
200
201fn default_rate_limit_burst() -> u32 {
202 200
203}
204
205fn default_max_body_size() -> usize {
206 10 * 1024 * 1024 }
208
209#[derive(Debug, Clone, Serialize, Deserialize)]
211pub struct TlsConfig {
212 pub cert_path: PathBuf,
213 pub key_path: PathBuf,
214}
215
216#[derive(Debug, Clone, Serialize, Deserialize)]
218pub struct ApplianceIdentity {
219 pub name: String,
220 pub mgmt_url: String,
221 pub retrieval_url: String,
222 pub engine_url: String,
223 pub etl_url: String,
224}
225
226#[derive(Debug, Clone, Serialize, Deserialize)]
228pub struct PeerConfig {
229 pub name: String,
230 pub mgmt_url: String,
231 pub retrieval_url: String,
232 #[serde(default)]
235 pub api_key: Option<String>,
236}
237
238#[derive(Debug, Clone, Serialize, Deserialize)]
240pub struct ClusterConfig {
241 pub identity: ApplianceIdentity,
242 #[serde(default = "default_cache_ttl")]
243 pub cache_ttl_secs: u64,
244 #[serde(default = "default_peer_timeout")]
245 pub peer_timeout_secs: u64,
246 #[serde(default)]
247 pub peers: Vec<PeerConfig>,
248 #[serde(default)]
252 pub api_key: Option<String>,
253 #[serde(default)]
259 pub reassign_appliance_enabled: bool,
260}
261
262fn default_cache_ttl() -> u64 {
263 300
264}
265
266fn default_peer_timeout() -> u64 {
267 30
268}
269
270#[derive(Debug, Clone, Serialize, Deserialize)]
278pub struct FailoverConfig {
279 pub peers: Vec<String>,
282 #[serde(default = "default_failover_timeout")]
284 pub timeout_secs: u64,
285}
286
287fn default_failover_timeout() -> u64 {
288 30
289}
290
291#[derive(Debug, Clone, Serialize, Deserialize)]
293pub struct PvaConfig {
294 #[serde(default = "default_pva_tcp_port")]
296 pub tcp_port: u16,
297 #[serde(default = "default_pva_udp_port")]
299 pub udp_port: u16,
300}
301
302fn default_pva_tcp_port() -> u16 {
303 5075
304}
305
306fn default_pva_udp_port() -> u16 {
307 5076
308}
309
310impl Default for PvaConfig {
311 fn default() -> Self {
312 Self {
313 tcp_port: default_pva_tcp_port(),
314 udp_port: default_pva_udp_port(),
315 }
316 }
317}
318
319impl ArchiverConfig {
320 pub fn from_toml(s: &str) -> Result<Self, toml::de::Error> {
321 toml::from_str(s)
322 }
323
324 pub fn validate(&self) -> anyhow::Result<()> {
326 if self.engine.write_period_secs == 0 {
327 anyhow::bail!("engine.write_period_secs must be > 0");
334 }
335 if self.engine.write_shards == 0 {
336 anyhow::bail!(
337 "engine.write_shards must be > 0 (use 1 for the legacy single-worker layout)"
338 );
339 }
340 if self.engine.per_shard_buffer == 0 && self.engine.write_shards > 1 {
341 anyhow::bail!(
342 "engine.per_shard_buffer must be > 0 when write_shards > 1; \
343 a 0-capacity shard channel would drop every sample"
344 );
345 }
346 for (name, tier) in [
347 ("sts", &self.storage.sts),
348 ("mts", &self.storage.mts),
349 ("lts", &self.storage.lts),
350 ] {
351 if tier.gather >= tier.hold {
352 anyhow::bail!(
353 "{name}: gather ({}) must be less than hold ({})",
354 tier.gather,
355 tier.hold,
356 );
357 }
358 }
359 if let Some(ref cluster) = self.cluster {
360 if cluster.peer_timeout_secs == 0 {
361 anyhow::bail!("cluster.peer_timeout_secs must be > 0");
362 }
363 if cluster.cache_ttl_secs == 0 {
364 anyhow::bail!("cluster.cache_ttl_secs must be > 0");
365 }
366 if self.api_keys.is_some() && !cluster.peers.is_empty() {
369 let has_fallback = cluster.api_key.is_some();
370 for (i, peer) in cluster.peers.iter().enumerate() {
371 if peer.api_key.is_none() && !has_fallback {
372 anyhow::bail!(
373 "cluster.peers[{i}] ({}) has no api_key and no cluster.api_key fallback; \
374 proxied write requests to this peer will be rejected",
375 peer.name
376 );
377 }
378 }
379 }
380 for (i, peer) in cluster.peers.iter().enumerate() {
381 if !peer.mgmt_url.starts_with("http://") && !peer.mgmt_url.starts_with("https://") {
382 anyhow::bail!(
383 "cluster.peers[{i}].mgmt_url must start with http:// or https://"
384 );
385 }
386 if !peer.retrieval_url.starts_with("http://")
387 && !peer.retrieval_url.starts_with("https://")
388 {
389 anyhow::bail!(
390 "cluster.peers[{i}].retrieval_url must start with http:// or https://"
391 );
392 }
393 }
394 }
395 Ok(())
396 }
397}
398
399#[cfg(test)]
400mod tests {
401 use super::*;
402
403 #[test]
404 fn parse_config_without_cluster() {
405 let toml = r#"
406[storage.sts]
407root_folder = "/tmp/sts"
408partition_granularity = "hour"
409
410[storage.mts]
411root_folder = "/tmp/mts"
412partition_granularity = "day"
413
414[storage.lts]
415root_folder = "/tmp/lts"
416partition_granularity = "year"
417"#;
418 let config = ArchiverConfig::from_toml(toml).unwrap();
419 assert!(config.cluster.is_none());
420 }
421
422 #[test]
423 fn parse_config_with_cluster() {
424 let toml = r#"
425[storage.sts]
426root_folder = "/tmp/sts"
427partition_granularity = "hour"
428
429[storage.mts]
430root_folder = "/tmp/mts"
431partition_granularity = "day"
432
433[storage.lts]
434root_folder = "/tmp/lts"
435partition_granularity = "year"
436
437[cluster.identity]
438name = "appliance0"
439mgmt_url = "http://host0:17665/mgmt/bpl"
440retrieval_url = "http://host0:17665/retrieval"
441engine_url = "http://host0:17665"
442etl_url = "http://host0:17665"
443
444[[cluster.peers]]
445name = "appliance1"
446mgmt_url = "http://host1:17665/mgmt/bpl"
447retrieval_url = "http://host1:17665/retrieval"
448"#;
449 let config = ArchiverConfig::from_toml(toml).unwrap();
450 let cluster = config.cluster.unwrap();
451 assert_eq!(cluster.identity.name, "appliance0");
452 assert_eq!(cluster.peers.len(), 1);
453 assert_eq!(cluster.peers[0].name, "appliance1");
454 assert_eq!(cluster.cache_ttl_secs, 300);
455 assert_eq!(cluster.peer_timeout_secs, 30);
456 }
457
458 #[test]
459 fn validate_cluster_api_key_required_with_api_keys() {
460 let toml = r#"
461api_keys = ["secret"]
462
463[storage.sts]
464root_folder = "/tmp/sts"
465partition_granularity = "hour"
466
467[storage.mts]
468root_folder = "/tmp/mts"
469partition_granularity = "day"
470
471[storage.lts]
472root_folder = "/tmp/lts"
473partition_granularity = "year"
474
475[cluster.identity]
476name = "appliance0"
477mgmt_url = "http://host0:17665/mgmt/bpl"
478retrieval_url = "http://host0:17665/retrieval"
479engine_url = "http://host0:17665"
480etl_url = "http://host0:17665"
481
482[[cluster.peers]]
483name = "appliance1"
484mgmt_url = "http://host1:17665/mgmt/bpl"
485retrieval_url = "http://host1:17665/retrieval"
486"#;
487 let config = ArchiverConfig::from_toml(toml).unwrap();
488 let err = config.validate().unwrap_err();
489 assert!(
490 err.to_string()
491 .contains("has no api_key and no cluster.api_key fallback")
492 );
493 }
494
495 #[test]
496 fn validate_cluster_api_key_not_required_without_api_keys() {
497 let toml = r#"
498[storage.sts]
499root_folder = "/tmp/sts"
500partition_granularity = "hour"
501
502[storage.mts]
503root_folder = "/tmp/mts"
504partition_granularity = "day"
505
506[storage.lts]
507root_folder = "/tmp/lts"
508partition_granularity = "year"
509
510[cluster.identity]
511name = "appliance0"
512mgmt_url = "http://host0:17665/mgmt/bpl"
513retrieval_url = "http://host0:17665/retrieval"
514engine_url = "http://host0:17665"
515etl_url = "http://host0:17665"
516
517[[cluster.peers]]
518name = "appliance1"
519mgmt_url = "http://host1:17665/mgmt/bpl"
520retrieval_url = "http://host1:17665/retrieval"
521"#;
522 let config = ArchiverConfig::from_toml(toml).unwrap();
523 config.validate().unwrap(); }
525
526 #[test]
527 fn validate_per_peer_keys_without_fallback() {
528 let toml = r#"
530api_keys = ["secret"]
531
532[storage.sts]
533root_folder = "/tmp/sts"
534partition_granularity = "hour"
535
536[storage.mts]
537root_folder = "/tmp/mts"
538partition_granularity = "day"
539
540[storage.lts]
541root_folder = "/tmp/lts"
542partition_granularity = "year"
543
544[cluster.identity]
545name = "appliance0"
546mgmt_url = "http://host0:17665/mgmt/bpl"
547retrieval_url = "http://host0:17665/retrieval"
548engine_url = "http://host0:17665"
549etl_url = "http://host0:17665"
550
551[[cluster.peers]]
552name = "appliance1"
553mgmt_url = "http://host1:17665/mgmt/bpl"
554retrieval_url = "http://host1:17665/retrieval"
555api_key = "peer1-key"
556"#;
557 let config = ArchiverConfig::from_toml(toml).unwrap();
558 config.validate().unwrap();
559 }
560
561 #[test]
562 fn validate_mixed_per_peer_and_fallback() {
563 let toml = r#"
565api_keys = ["secret"]
566
567[storage.sts]
568root_folder = "/tmp/sts"
569partition_granularity = "hour"
570
571[storage.mts]
572root_folder = "/tmp/mts"
573partition_granularity = "day"
574
575[storage.lts]
576root_folder = "/tmp/lts"
577partition_granularity = "year"
578
579[cluster]
580api_key = "shared-fallback"
581
582[cluster.identity]
583name = "appliance0"
584mgmt_url = "http://host0:17665/mgmt/bpl"
585retrieval_url = "http://host0:17665/retrieval"
586engine_url = "http://host0:17665"
587etl_url = "http://host0:17665"
588
589[[cluster.peers]]
590name = "appliance1"
591mgmt_url = "http://host1:17665/mgmt/bpl"
592retrieval_url = "http://host1:17665/retrieval"
593api_key = "peer1-specific"
594
595[[cluster.peers]]
596name = "appliance2"
597mgmt_url = "http://host2:17665/mgmt/bpl"
598retrieval_url = "http://host2:17665/retrieval"
599"#;
600 let config = ArchiverConfig::from_toml(toml).unwrap();
601 config.validate().unwrap();
602 }
603
604 #[test]
605 fn parse_peer_api_key_from_toml() {
606 let toml = r#"
607[storage.sts]
608root_folder = "/tmp/sts"
609partition_granularity = "hour"
610
611[storage.mts]
612root_folder = "/tmp/mts"
613partition_granularity = "day"
614
615[storage.lts]
616root_folder = "/tmp/lts"
617partition_granularity = "year"
618
619[cluster.identity]
620name = "appliance0"
621mgmt_url = "http://host0:17665/mgmt/bpl"
622retrieval_url = "http://host0:17665/retrieval"
623engine_url = "http://host0:17665"
624etl_url = "http://host0:17665"
625
626[[cluster.peers]]
627name = "appliance1"
628mgmt_url = "http://host1:17665/mgmt/bpl"
629retrieval_url = "http://host1:17665/retrieval"
630api_key = "peer1-secret"
631
632[[cluster.peers]]
633name = "appliance2"
634mgmt_url = "http://host2:17665/mgmt/bpl"
635retrieval_url = "http://host2:17665/retrieval"
636"#;
637 let config = ArchiverConfig::from_toml(toml).unwrap();
638 let cluster = config.cluster.unwrap();
639 assert_eq!(cluster.peers[0].api_key.as_deref(), Some("peer1-secret"));
640 assert_eq!(cluster.peers[1].api_key, None);
641 }
642}