use std::path::PathBuf;
use serde::{Deserialize, Serialize};
use crate::storage::partition::PartitionGranularity;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ArchiverConfig {
#[serde(default = "default_listen_addr")]
pub listen_addr: String,
#[serde(default = "default_listen_port")]
pub listen_port: u16,
pub storage: StorageConfig,
#[serde(default)]
pub engine: EngineConfig,
#[serde(default)]
pub cluster: Option<ClusterConfig>,
#[serde(default)]
pub failover: Option<FailoverConfig>,
#[serde(default)]
pub pva: Option<PvaConfig>,
#[serde(default)]
pub api_keys: Option<Vec<String>>,
#[serde(default)]
pub security: SecurityConfig,
#[serde(default)]
pub tls: Option<TlsConfig>,
}
fn default_listen_addr() -> String {
"0.0.0.0".to_string()
}
fn default_listen_port() -> u16 {
17665
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StorageConfig {
pub sts: TierConfig,
pub mts: TierConfig,
pub lts: TierConfig,
#[serde(default)]
pub max_open_writers_total: Option<usize>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TierConfig {
pub root_folder: PathBuf,
pub partition_granularity: PartitionGranularity,
#[serde(default = "default_hold")]
pub hold: u32,
#[serde(default = "default_gather")]
pub gather: u32,
#[serde(default)]
pub max_open_writers: Option<usize>,
}
fn default_hold() -> u32 {
5
}
fn default_gather() -> u32 {
3
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EngineConfig {
#[serde(default = "default_write_period")]
pub write_period_secs: u64,
pub policy_file: Option<PathBuf>,
#[serde(default = "default_server_ioc_drift_secs")]
pub server_ioc_drift_secs: u64,
#[serde(default = "default_write_shards")]
pub write_shards: usize,
#[serde(default = "default_per_shard_buffer")]
pub per_shard_buffer: usize,
}
fn default_write_period() -> u64 {
10
}
fn default_server_ioc_drift_secs() -> u64 {
30 * 60
}
fn default_write_shards() -> usize {
1
}
fn default_per_shard_buffer() -> usize {
4096
}
impl Default for EngineConfig {
fn default() -> Self {
Self {
write_period_secs: default_write_period(),
policy_file: None,
server_ioc_drift_secs: default_server_ioc_drift_secs(),
write_shards: default_write_shards(),
per_shard_buffer: default_per_shard_buffer(),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SecurityConfig {
#[serde(default)]
pub cors_origins: Vec<String>,
#[serde(default = "default_rate_limit_rps")]
pub rate_limit_rps: u32,
#[serde(default = "default_rate_limit_burst")]
pub rate_limit_burst: u32,
#[serde(default = "default_max_body_size")]
pub max_body_size: usize,
#[serde(default)]
pub trust_proxy_headers: bool,
}
impl Default for SecurityConfig {
fn default() -> Self {
Self {
cors_origins: Vec::new(),
rate_limit_rps: default_rate_limit_rps(),
rate_limit_burst: default_rate_limit_burst(),
max_body_size: default_max_body_size(),
trust_proxy_headers: false,
}
}
}
fn default_rate_limit_rps() -> u32 {
100
}
fn default_rate_limit_burst() -> u32 {
200
}
fn default_max_body_size() -> usize {
10 * 1024 * 1024 }
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TlsConfig {
pub cert_path: PathBuf,
pub key_path: PathBuf,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ApplianceIdentity {
pub name: String,
pub mgmt_url: String,
pub retrieval_url: String,
pub engine_url: String,
pub etl_url: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PeerConfig {
pub name: String,
pub mgmt_url: String,
pub retrieval_url: String,
#[serde(default)]
pub api_key: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ClusterConfig {
pub identity: ApplianceIdentity,
#[serde(default = "default_cache_ttl")]
pub cache_ttl_secs: u64,
#[serde(default = "default_peer_timeout")]
pub peer_timeout_secs: u64,
#[serde(default)]
pub peers: Vec<PeerConfig>,
#[serde(default)]
pub api_key: Option<String>,
#[serde(default)]
pub reassign_appliance_enabled: bool,
}
fn default_cache_ttl() -> u64 {
300
}
fn default_peer_timeout() -> u64 {
30
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FailoverConfig {
pub peers: Vec<String>,
#[serde(default = "default_failover_timeout")]
pub timeout_secs: u64,
}
fn default_failover_timeout() -> u64 {
30
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PvaConfig {
#[serde(default = "default_pva_tcp_port")]
pub tcp_port: u16,
#[serde(default = "default_pva_udp_port")]
pub udp_port: u16,
}
fn default_pva_tcp_port() -> u16 {
5075
}
fn default_pva_udp_port() -> u16 {
5076
}
impl Default for PvaConfig {
fn default() -> Self {
Self {
tcp_port: default_pva_tcp_port(),
udp_port: default_pva_udp_port(),
}
}
}
impl ArchiverConfig {
pub fn from_toml(s: &str) -> Result<Self, toml::de::Error> {
toml::from_str(s)
}
pub fn validate(&self) -> anyhow::Result<()> {
if self.engine.write_period_secs == 0 {
anyhow::bail!("engine.write_period_secs must be > 0");
}
if self.engine.write_shards == 0 {
anyhow::bail!(
"engine.write_shards must be > 0 (use 1 for the legacy single-worker layout)"
);
}
if self.engine.per_shard_buffer == 0 && self.engine.write_shards > 1 {
anyhow::bail!(
"engine.per_shard_buffer must be > 0 when write_shards > 1; \
a 0-capacity shard channel would drop every sample"
);
}
for (name, tier) in [
("sts", &self.storage.sts),
("mts", &self.storage.mts),
("lts", &self.storage.lts),
] {
if tier.gather >= tier.hold {
anyhow::bail!(
"{name}: gather ({}) must be less than hold ({})",
tier.gather,
tier.hold,
);
}
}
if let Some(ref cluster) = self.cluster {
if cluster.peer_timeout_secs == 0 {
anyhow::bail!("cluster.peer_timeout_secs must be > 0");
}
if cluster.cache_ttl_secs == 0 {
anyhow::bail!("cluster.cache_ttl_secs must be > 0");
}
if self.api_keys.is_some() && !cluster.peers.is_empty() {
let has_fallback = cluster.api_key.is_some();
for (i, peer) in cluster.peers.iter().enumerate() {
if peer.api_key.is_none() && !has_fallback {
anyhow::bail!(
"cluster.peers[{i}] ({}) has no api_key and no cluster.api_key fallback; \
proxied write requests to this peer will be rejected",
peer.name
);
}
}
}
for (i, peer) in cluster.peers.iter().enumerate() {
if !peer.mgmt_url.starts_with("http://") && !peer.mgmt_url.starts_with("https://") {
anyhow::bail!(
"cluster.peers[{i}].mgmt_url must start with http:// or https://"
);
}
if !peer.retrieval_url.starts_with("http://")
&& !peer.retrieval_url.starts_with("https://")
{
anyhow::bail!(
"cluster.peers[{i}].retrieval_url must start with http:// or https://"
);
}
}
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn parse_config_without_cluster() {
let toml = r#"
[storage.sts]
root_folder = "/tmp/sts"
partition_granularity = "hour"
[storage.mts]
root_folder = "/tmp/mts"
partition_granularity = "day"
[storage.lts]
root_folder = "/tmp/lts"
partition_granularity = "year"
"#;
let config = ArchiverConfig::from_toml(toml).unwrap();
assert!(config.cluster.is_none());
}
#[test]
fn parse_config_with_cluster() {
let toml = r#"
[storage.sts]
root_folder = "/tmp/sts"
partition_granularity = "hour"
[storage.mts]
root_folder = "/tmp/mts"
partition_granularity = "day"
[storage.lts]
root_folder = "/tmp/lts"
partition_granularity = "year"
[cluster.identity]
name = "appliance0"
mgmt_url = "http://host0:17665/mgmt/bpl"
retrieval_url = "http://host0:17665/retrieval"
engine_url = "http://host0:17665"
etl_url = "http://host0:17665"
[[cluster.peers]]
name = "appliance1"
mgmt_url = "http://host1:17665/mgmt/bpl"
retrieval_url = "http://host1:17665/retrieval"
"#;
let config = ArchiverConfig::from_toml(toml).unwrap();
let cluster = config.cluster.unwrap();
assert_eq!(cluster.identity.name, "appliance0");
assert_eq!(cluster.peers.len(), 1);
assert_eq!(cluster.peers[0].name, "appliance1");
assert_eq!(cluster.cache_ttl_secs, 300);
assert_eq!(cluster.peer_timeout_secs, 30);
}
#[test]
fn validate_cluster_api_key_required_with_api_keys() {
let toml = r#"
api_keys = ["secret"]
[storage.sts]
root_folder = "/tmp/sts"
partition_granularity = "hour"
[storage.mts]
root_folder = "/tmp/mts"
partition_granularity = "day"
[storage.lts]
root_folder = "/tmp/lts"
partition_granularity = "year"
[cluster.identity]
name = "appliance0"
mgmt_url = "http://host0:17665/mgmt/bpl"
retrieval_url = "http://host0:17665/retrieval"
engine_url = "http://host0:17665"
etl_url = "http://host0:17665"
[[cluster.peers]]
name = "appliance1"
mgmt_url = "http://host1:17665/mgmt/bpl"
retrieval_url = "http://host1:17665/retrieval"
"#;
let config = ArchiverConfig::from_toml(toml).unwrap();
let err = config.validate().unwrap_err();
assert!(
err.to_string()
.contains("has no api_key and no cluster.api_key fallback")
);
}
#[test]
fn validate_cluster_api_key_not_required_without_api_keys() {
let toml = r#"
[storage.sts]
root_folder = "/tmp/sts"
partition_granularity = "hour"
[storage.mts]
root_folder = "/tmp/mts"
partition_granularity = "day"
[storage.lts]
root_folder = "/tmp/lts"
partition_granularity = "year"
[cluster.identity]
name = "appliance0"
mgmt_url = "http://host0:17665/mgmt/bpl"
retrieval_url = "http://host0:17665/retrieval"
engine_url = "http://host0:17665"
etl_url = "http://host0:17665"
[[cluster.peers]]
name = "appliance1"
mgmt_url = "http://host1:17665/mgmt/bpl"
retrieval_url = "http://host1:17665/retrieval"
"#;
let config = ArchiverConfig::from_toml(toml).unwrap();
config.validate().unwrap(); }
#[test]
fn validate_per_peer_keys_without_fallback() {
let toml = r#"
api_keys = ["secret"]
[storage.sts]
root_folder = "/tmp/sts"
partition_granularity = "hour"
[storage.mts]
root_folder = "/tmp/mts"
partition_granularity = "day"
[storage.lts]
root_folder = "/tmp/lts"
partition_granularity = "year"
[cluster.identity]
name = "appliance0"
mgmt_url = "http://host0:17665/mgmt/bpl"
retrieval_url = "http://host0:17665/retrieval"
engine_url = "http://host0:17665"
etl_url = "http://host0:17665"
[[cluster.peers]]
name = "appliance1"
mgmt_url = "http://host1:17665/mgmt/bpl"
retrieval_url = "http://host1:17665/retrieval"
api_key = "peer1-key"
"#;
let config = ArchiverConfig::from_toml(toml).unwrap();
config.validate().unwrap();
}
#[test]
fn validate_mixed_per_peer_and_fallback() {
let toml = r#"
api_keys = ["secret"]
[storage.sts]
root_folder = "/tmp/sts"
partition_granularity = "hour"
[storage.mts]
root_folder = "/tmp/mts"
partition_granularity = "day"
[storage.lts]
root_folder = "/tmp/lts"
partition_granularity = "year"
[cluster]
api_key = "shared-fallback"
[cluster.identity]
name = "appliance0"
mgmt_url = "http://host0:17665/mgmt/bpl"
retrieval_url = "http://host0:17665/retrieval"
engine_url = "http://host0:17665"
etl_url = "http://host0:17665"
[[cluster.peers]]
name = "appliance1"
mgmt_url = "http://host1:17665/mgmt/bpl"
retrieval_url = "http://host1:17665/retrieval"
api_key = "peer1-specific"
[[cluster.peers]]
name = "appliance2"
mgmt_url = "http://host2:17665/mgmt/bpl"
retrieval_url = "http://host2:17665/retrieval"
"#;
let config = ArchiverConfig::from_toml(toml).unwrap();
config.validate().unwrap();
}
#[test]
fn parse_peer_api_key_from_toml() {
let toml = r#"
[storage.sts]
root_folder = "/tmp/sts"
partition_granularity = "hour"
[storage.mts]
root_folder = "/tmp/mts"
partition_granularity = "day"
[storage.lts]
root_folder = "/tmp/lts"
partition_granularity = "year"
[cluster.identity]
name = "appliance0"
mgmt_url = "http://host0:17665/mgmt/bpl"
retrieval_url = "http://host0:17665/retrieval"
engine_url = "http://host0:17665"
etl_url = "http://host0:17665"
[[cluster.peers]]
name = "appliance1"
mgmt_url = "http://host1:17665/mgmt/bpl"
retrieval_url = "http://host1:17665/retrieval"
api_key = "peer1-secret"
[[cluster.peers]]
name = "appliance2"
mgmt_url = "http://host2:17665/mgmt/bpl"
retrieval_url = "http://host2:17665/retrieval"
"#;
let config = ArchiverConfig::from_toml(toml).unwrap();
let cluster = config.cluster.unwrap();
assert_eq!(cluster.peers[0].api_key.as_deref(), Some("peer1-secret"));
assert_eq!(cluster.peers[1].api_key, None);
}
}