1use std::path::PathBuf;
2
3use serde::{Deserialize, Serialize};
4
5use crate::storage::partition::PartitionGranularity;
6
7#[derive(Debug, Clone, Serialize, Deserialize)]
9pub struct ArchiverConfig {
10 #[serde(default = "default_listen_addr")]
11 pub listen_addr: String,
12 #[serde(default = "default_listen_port")]
13 pub listen_port: u16,
14 pub storage: StorageConfig,
15 #[serde(default)]
16 pub engine: EngineConfig,
17
18 #[serde(default)]
19 pub cluster: Option<ClusterConfig>,
20 #[serde(default)]
24 pub failover: Option<FailoverConfig>,
25 #[serde(default)]
28 pub pva: Option<PvaConfig>,
29 #[serde(default)]
33 pub api_keys: Option<Vec<String>>,
34 #[serde(default)]
36 pub security: SecurityConfig,
37 #[serde(default)]
39 pub tls: Option<TlsConfig>,
40}
41
42fn default_listen_addr() -> String {
43 "0.0.0.0".to_string()
44}
45
46fn default_listen_port() -> u16 {
47 17665
48}
49
50#[derive(Debug, Clone, Serialize, Deserialize)]
52pub struct StorageConfig {
53 pub sts: TierConfig,
54 pub mts: TierConfig,
55 pub lts: TierConfig,
56}
57
58#[derive(Debug, Clone, Serialize, Deserialize)]
59pub struct TierConfig {
60 pub root_folder: PathBuf,
61 pub partition_granularity: PartitionGranularity,
62 #[serde(default = "default_hold")]
64 pub hold: u32,
65 #[serde(default = "default_gather")]
67 pub gather: u32,
68}
69
70fn default_hold() -> u32 {
71 5
72}
73
74fn default_gather() -> u32 {
75 3
76}
77
78#[derive(Debug, Clone, Default, Serialize, Deserialize)]
80pub struct EngineConfig {
81 #[serde(default = "default_write_period")]
83 pub write_period_secs: u64,
84 pub policy_file: Option<PathBuf>,
86 #[serde(default = "default_server_ioc_drift_secs")]
92 pub server_ioc_drift_secs: u64,
93}
94
95fn default_write_period() -> u64 {
96 10
97}
98
99fn default_server_ioc_drift_secs() -> u64 {
100 30 * 60
101}
102
103#[derive(Debug, Clone, Serialize, Deserialize)]
105pub struct SecurityConfig {
106 #[serde(default)]
108 pub cors_origins: Vec<String>,
109 #[serde(default = "default_rate_limit_rps")]
111 pub rate_limit_rps: u32,
112 #[serde(default = "default_rate_limit_burst")]
114 pub rate_limit_burst: u32,
115 #[serde(default = "default_max_body_size")]
117 pub max_body_size: usize,
118 #[serde(default)]
122 pub trust_proxy_headers: bool,
123}
124
125impl Default for SecurityConfig {
126 fn default() -> Self {
127 Self {
128 cors_origins: Vec::new(),
129 rate_limit_rps: default_rate_limit_rps(),
130 rate_limit_burst: default_rate_limit_burst(),
131 max_body_size: default_max_body_size(),
132 trust_proxy_headers: false,
133 }
134 }
135}
136
137fn default_rate_limit_rps() -> u32 {
138 100
139}
140
141fn default_rate_limit_burst() -> u32 {
142 200
143}
144
145fn default_max_body_size() -> usize {
146 10 * 1024 * 1024 }
148
149#[derive(Debug, Clone, Serialize, Deserialize)]
151pub struct TlsConfig {
152 pub cert_path: PathBuf,
153 pub key_path: PathBuf,
154}
155
156#[derive(Debug, Clone, Serialize, Deserialize)]
158pub struct ApplianceIdentity {
159 pub name: String,
160 pub mgmt_url: String,
161 pub retrieval_url: String,
162 pub engine_url: String,
163 pub etl_url: String,
164}
165
166#[derive(Debug, Clone, Serialize, Deserialize)]
168pub struct PeerConfig {
169 pub name: String,
170 pub mgmt_url: String,
171 pub retrieval_url: String,
172 #[serde(default)]
175 pub api_key: Option<String>,
176}
177
178#[derive(Debug, Clone, Serialize, Deserialize)]
180pub struct ClusterConfig {
181 pub identity: ApplianceIdentity,
182 #[serde(default = "default_cache_ttl")]
183 pub cache_ttl_secs: u64,
184 #[serde(default = "default_peer_timeout")]
185 pub peer_timeout_secs: u64,
186 #[serde(default)]
187 pub peers: Vec<PeerConfig>,
188 #[serde(default)]
192 pub api_key: Option<String>,
193 #[serde(default)]
199 pub reassign_appliance_enabled: bool,
200}
201
202fn default_cache_ttl() -> u64 {
203 300
204}
205
206fn default_peer_timeout() -> u64 {
207 30
208}
209
210#[derive(Debug, Clone, Serialize, Deserialize)]
218pub struct FailoverConfig {
219 pub peers: Vec<String>,
222 #[serde(default = "default_failover_timeout")]
224 pub timeout_secs: u64,
225}
226
227fn default_failover_timeout() -> u64 {
228 30
229}
230
231#[derive(Debug, Clone, Serialize, Deserialize)]
233pub struct PvaConfig {
234 #[serde(default = "default_pva_tcp_port")]
236 pub tcp_port: u16,
237 #[serde(default = "default_pva_udp_port")]
239 pub udp_port: u16,
240}
241
242fn default_pva_tcp_port() -> u16 {
243 5075
244}
245
246fn default_pva_udp_port() -> u16 {
247 5076
248}
249
250impl Default for PvaConfig {
251 fn default() -> Self {
252 Self {
253 tcp_port: default_pva_tcp_port(),
254 udp_port: default_pva_udp_port(),
255 }
256 }
257}
258
259impl ArchiverConfig {
260 pub fn from_toml(s: &str) -> Result<Self, toml::de::Error> {
261 toml::from_str(s)
262 }
263
264 pub fn validate(&self) -> anyhow::Result<()> {
266 for (name, tier) in [
267 ("sts", &self.storage.sts),
268 ("mts", &self.storage.mts),
269 ("lts", &self.storage.lts),
270 ] {
271 if tier.gather >= tier.hold {
272 anyhow::bail!(
273 "{name}: gather ({}) must be less than hold ({})",
274 tier.gather,
275 tier.hold,
276 );
277 }
278 }
279 if let Some(ref cluster) = self.cluster {
280 if cluster.peer_timeout_secs == 0 {
281 anyhow::bail!("cluster.peer_timeout_secs must be > 0");
282 }
283 if cluster.cache_ttl_secs == 0 {
284 anyhow::bail!("cluster.cache_ttl_secs must be > 0");
285 }
286 if self.api_keys.is_some() && !cluster.peers.is_empty() {
289 let has_fallback = cluster.api_key.is_some();
290 for (i, peer) in cluster.peers.iter().enumerate() {
291 if peer.api_key.is_none() && !has_fallback {
292 anyhow::bail!(
293 "cluster.peers[{i}] ({}) has no api_key and no cluster.api_key fallback; \
294 proxied write requests to this peer will be rejected",
295 peer.name
296 );
297 }
298 }
299 }
300 for (i, peer) in cluster.peers.iter().enumerate() {
301 if !peer.mgmt_url.starts_with("http://") && !peer.mgmt_url.starts_with("https://") {
302 anyhow::bail!(
303 "cluster.peers[{i}].mgmt_url must start with http:// or https://"
304 );
305 }
306 if !peer.retrieval_url.starts_with("http://")
307 && !peer.retrieval_url.starts_with("https://")
308 {
309 anyhow::bail!(
310 "cluster.peers[{i}].retrieval_url must start with http:// or https://"
311 );
312 }
313 }
314 }
315 Ok(())
316 }
317}
318
319#[cfg(test)]
320mod tests {
321 use super::*;
322
323 #[test]
324 fn parse_config_without_cluster() {
325 let toml = r#"
326[storage.sts]
327root_folder = "/tmp/sts"
328partition_granularity = "hour"
329
330[storage.mts]
331root_folder = "/tmp/mts"
332partition_granularity = "day"
333
334[storage.lts]
335root_folder = "/tmp/lts"
336partition_granularity = "year"
337"#;
338 let config = ArchiverConfig::from_toml(toml).unwrap();
339 assert!(config.cluster.is_none());
340 }
341
342 #[test]
343 fn parse_config_with_cluster() {
344 let toml = r#"
345[storage.sts]
346root_folder = "/tmp/sts"
347partition_granularity = "hour"
348
349[storage.mts]
350root_folder = "/tmp/mts"
351partition_granularity = "day"
352
353[storage.lts]
354root_folder = "/tmp/lts"
355partition_granularity = "year"
356
357[cluster.identity]
358name = "appliance0"
359mgmt_url = "http://host0:17665/mgmt/bpl"
360retrieval_url = "http://host0:17665/retrieval"
361engine_url = "http://host0:17665"
362etl_url = "http://host0:17665"
363
364[[cluster.peers]]
365name = "appliance1"
366mgmt_url = "http://host1:17665/mgmt/bpl"
367retrieval_url = "http://host1:17665/retrieval"
368"#;
369 let config = ArchiverConfig::from_toml(toml).unwrap();
370 let cluster = config.cluster.unwrap();
371 assert_eq!(cluster.identity.name, "appliance0");
372 assert_eq!(cluster.peers.len(), 1);
373 assert_eq!(cluster.peers[0].name, "appliance1");
374 assert_eq!(cluster.cache_ttl_secs, 300);
375 assert_eq!(cluster.peer_timeout_secs, 30);
376 }
377
378 #[test]
379 fn validate_cluster_api_key_required_with_api_keys() {
380 let toml = r#"
381api_keys = ["secret"]
382
383[storage.sts]
384root_folder = "/tmp/sts"
385partition_granularity = "hour"
386
387[storage.mts]
388root_folder = "/tmp/mts"
389partition_granularity = "day"
390
391[storage.lts]
392root_folder = "/tmp/lts"
393partition_granularity = "year"
394
395[cluster.identity]
396name = "appliance0"
397mgmt_url = "http://host0:17665/mgmt/bpl"
398retrieval_url = "http://host0:17665/retrieval"
399engine_url = "http://host0:17665"
400etl_url = "http://host0:17665"
401
402[[cluster.peers]]
403name = "appliance1"
404mgmt_url = "http://host1:17665/mgmt/bpl"
405retrieval_url = "http://host1:17665/retrieval"
406"#;
407 let config = ArchiverConfig::from_toml(toml).unwrap();
408 let err = config.validate().unwrap_err();
409 assert!(
410 err.to_string()
411 .contains("has no api_key and no cluster.api_key fallback")
412 );
413 }
414
415 #[test]
416 fn validate_cluster_api_key_not_required_without_api_keys() {
417 let toml = r#"
418[storage.sts]
419root_folder = "/tmp/sts"
420partition_granularity = "hour"
421
422[storage.mts]
423root_folder = "/tmp/mts"
424partition_granularity = "day"
425
426[storage.lts]
427root_folder = "/tmp/lts"
428partition_granularity = "year"
429
430[cluster.identity]
431name = "appliance0"
432mgmt_url = "http://host0:17665/mgmt/bpl"
433retrieval_url = "http://host0:17665/retrieval"
434engine_url = "http://host0:17665"
435etl_url = "http://host0:17665"
436
437[[cluster.peers]]
438name = "appliance1"
439mgmt_url = "http://host1:17665/mgmt/bpl"
440retrieval_url = "http://host1:17665/retrieval"
441"#;
442 let config = ArchiverConfig::from_toml(toml).unwrap();
443 config.validate().unwrap(); }
445
446 #[test]
447 fn validate_per_peer_keys_without_fallback() {
448 let toml = r#"
450api_keys = ["secret"]
451
452[storage.sts]
453root_folder = "/tmp/sts"
454partition_granularity = "hour"
455
456[storage.mts]
457root_folder = "/tmp/mts"
458partition_granularity = "day"
459
460[storage.lts]
461root_folder = "/tmp/lts"
462partition_granularity = "year"
463
464[cluster.identity]
465name = "appliance0"
466mgmt_url = "http://host0:17665/mgmt/bpl"
467retrieval_url = "http://host0:17665/retrieval"
468engine_url = "http://host0:17665"
469etl_url = "http://host0:17665"
470
471[[cluster.peers]]
472name = "appliance1"
473mgmt_url = "http://host1:17665/mgmt/bpl"
474retrieval_url = "http://host1:17665/retrieval"
475api_key = "peer1-key"
476"#;
477 let config = ArchiverConfig::from_toml(toml).unwrap();
478 config.validate().unwrap();
479 }
480
481 #[test]
482 fn validate_mixed_per_peer_and_fallback() {
483 let toml = r#"
485api_keys = ["secret"]
486
487[storage.sts]
488root_folder = "/tmp/sts"
489partition_granularity = "hour"
490
491[storage.mts]
492root_folder = "/tmp/mts"
493partition_granularity = "day"
494
495[storage.lts]
496root_folder = "/tmp/lts"
497partition_granularity = "year"
498
499[cluster]
500api_key = "shared-fallback"
501
502[cluster.identity]
503name = "appliance0"
504mgmt_url = "http://host0:17665/mgmt/bpl"
505retrieval_url = "http://host0:17665/retrieval"
506engine_url = "http://host0:17665"
507etl_url = "http://host0:17665"
508
509[[cluster.peers]]
510name = "appliance1"
511mgmt_url = "http://host1:17665/mgmt/bpl"
512retrieval_url = "http://host1:17665/retrieval"
513api_key = "peer1-specific"
514
515[[cluster.peers]]
516name = "appliance2"
517mgmt_url = "http://host2:17665/mgmt/bpl"
518retrieval_url = "http://host2:17665/retrieval"
519"#;
520 let config = ArchiverConfig::from_toml(toml).unwrap();
521 config.validate().unwrap();
522 }
523
524 #[test]
525 fn parse_peer_api_key_from_toml() {
526 let toml = r#"
527[storage.sts]
528root_folder = "/tmp/sts"
529partition_granularity = "hour"
530
531[storage.mts]
532root_folder = "/tmp/mts"
533partition_granularity = "day"
534
535[storage.lts]
536root_folder = "/tmp/lts"
537partition_granularity = "year"
538
539[cluster.identity]
540name = "appliance0"
541mgmt_url = "http://host0:17665/mgmt/bpl"
542retrieval_url = "http://host0:17665/retrieval"
543engine_url = "http://host0:17665"
544etl_url = "http://host0:17665"
545
546[[cluster.peers]]
547name = "appliance1"
548mgmt_url = "http://host1:17665/mgmt/bpl"
549retrieval_url = "http://host1:17665/retrieval"
550api_key = "peer1-secret"
551
552[[cluster.peers]]
553name = "appliance2"
554mgmt_url = "http://host2:17665/mgmt/bpl"
555retrieval_url = "http://host2:17665/retrieval"
556"#;
557 let config = ArchiverConfig::from_toml(toml).unwrap();
558 let cluster = config.cluster.unwrap();
559 assert_eq!(cluster.peers[0].api_key.as_deref(), Some("peer1-secret"));
560 assert_eq!(cluster.peers[1].api_key, None);
561 }
562}