Skip to main content

archiver_core/
config.rs

1use std::path::PathBuf;
2
3use serde::{Deserialize, Serialize};
4
5use crate::storage::partition::PartitionGranularity;
6
7/// Top-level archiver configuration (TOML-based).
8#[derive(Debug, Clone, Serialize, Deserialize)]
9pub struct ArchiverConfig {
10    #[serde(default = "default_listen_addr")]
11    pub listen_addr: String,
12    #[serde(default = "default_listen_port")]
13    pub listen_port: u16,
14    pub storage: StorageConfig,
15    #[serde(default)]
16    pub engine: EngineConfig,
17
18    #[serde(default)]
19    pub cluster: Option<ClusterConfig>,
20    /// Optional list of external archivers used for failover-merged retrieval.
21    /// When set, retrieval handlers fetch from each peer in addition to local
22    /// data and merge by timestamp (with duplicate-timestamp drop).
23    #[serde(default)]
24    pub failover: Option<FailoverConfig>,
25    /// PVA retrieval RPC server. When set, the archiver hosts
26    /// `archappl/getData` and `archappl/getDataAtTime` PVA RPC PVs.
27    #[serde(default)]
28    pub pva: Option<PvaConfig>,
29    /// Optional API keys for management endpoint authentication.
30    /// If set, mgmt write endpoints require `Authorization: Bearer <key>` or `X-API-Key: <key>`.
31    /// Retrieval GET endpoints remain open.
32    #[serde(default)]
33    pub api_keys: Option<Vec<String>>,
34    /// Security settings (CORS, rate limiting, body limits).
35    #[serde(default)]
36    pub security: SecurityConfig,
37    /// Optional TLS configuration for HTTPS.
38    #[serde(default)]
39    pub tls: Option<TlsConfig>,
40}
41
42fn default_listen_addr() -> String {
43    "0.0.0.0".to_string()
44}
45
46fn default_listen_port() -> u16 {
47    17665
48}
49
50/// 3-tier storage configuration.
51#[derive(Debug, Clone, Serialize, Deserialize)]
52pub struct StorageConfig {
53    pub sts: TierConfig,
54    pub mts: TierConfig,
55    pub lts: TierConfig,
56    /// Process-wide cap on open `BufWriter` file handles, shared
57    /// across STS / MTS / LTS. When `Some`, every tier's
58    /// PlainPbStoragePlugin draws from the SAME [`FdBudget`] —
59    /// total open writers across all 3 tiers stays at-or-below
60    /// this number, so the process cannot summed-overflow its
61    /// `ulimit -n` even when each tier is busy. When `None`, each
62    /// tier keeps its own per-tier cap (legacy behavior).
63    #[serde(default)]
64    pub max_open_writers_total: Option<usize>,
65}
66
67#[derive(Debug, Clone, Serialize, Deserialize)]
68pub struct TierConfig {
69    pub root_folder: PathBuf,
70    pub partition_granularity: PartitionGranularity,
71    /// Number of partitions to hold before ETL moves data out.
72    #[serde(default = "default_hold")]
73    pub hold: u32,
74    /// Number of partitions to gather (move out) at once.
75    #[serde(default = "default_gather")]
76    pub gather: u32,
77    /// Per-tier cap on open `BufWriter` file handles. Only
78    /// consulted when [`StorageConfig::max_open_writers_total`] is
79    /// `None`; otherwise all tiers share the global budget. `0`
80    /// disables the cap (lifts to `usize::MAX`); `None` falls back
81    /// to a tier-appropriate default (STS: 512, MTS/LTS: 64 — STS
82    /// gets the bulk because it's the live ingest path).
83    #[serde(default)]
84    pub max_open_writers: Option<usize>,
85}
86
87fn default_hold() -> u32 {
88    5
89}
90
91fn default_gather() -> u32 {
92    3
93}
94
95/// EPICS CA engine configuration.
96#[derive(Debug, Clone, Default, Serialize, Deserialize)]
97pub struct EngineConfig {
98    /// Write period in seconds — how often buffered samples flush to storage.
99    #[serde(default = "default_write_period")]
100    pub write_period_secs: u64,
101    /// Path to PV policy TOML file.
102    pub policy_file: Option<PathBuf>,
103    /// Maximum allowed drift between IOC-reported sample timestamps and
104    /// the appliance's wall clock, in either direction (Java parity
105    /// 6538631 — `org.epics.archiverappliance.engine.epics.SERVER_IOC_DRIFT_SECONDS`).
106    /// Default 30 minutes; set higher for known-skewed sites without
107    /// recompiling.
108    #[serde(default = "default_server_ioc_drift_secs")]
109    pub server_ioc_drift_secs: u64,
110    /// Number of parallel write-loop shards. `1` (the default)
111    /// keeps the legacy single-worker layout. Sites with many
112    /// active PVs and a fast STS can raise this to e.g. 4–16; the
113    /// engine spawns a dispatcher that hashes `pv_name` to a fixed
114    /// shard so per-PV ordering is preserved while different PVs
115    /// can append concurrently.
116    #[serde(default = "default_write_shards")]
117    pub write_shards: usize,
118    /// Per-shard mpsc capacity. Only consulted when `write_shards
119    /// > 1`. The dispatcher `try_send`s into each shard channel —
120    /// when a shard is saturated its overflow is dropped and
121    /// recorded on the per-PV `buffer_overflow_drops` counter,
122    /// while OTHER shards keep flowing (per-shard isolation).
123    #[serde(default = "default_per_shard_buffer")]
124    pub per_shard_buffer: usize,
125}
126
127fn default_write_period() -> u64 {
128    10
129}
130
131fn default_server_ioc_drift_secs() -> u64 {
132    30 * 60
133}
134
135fn default_write_shards() -> usize {
136    1
137}
138
139fn default_per_shard_buffer() -> usize {
140    4096
141}
142
143/// Security configuration.
144#[derive(Debug, Clone, Serialize, Deserialize)]
145pub struct SecurityConfig {
146    /// CORS allowed origins. Empty = same-origin only (strict).
147    #[serde(default)]
148    pub cors_origins: Vec<String>,
149    /// Rate limit: requests per second per IP (0 = disabled).
150    #[serde(default = "default_rate_limit_rps")]
151    pub rate_limit_rps: u32,
152    /// Rate limit burst size.
153    #[serde(default = "default_rate_limit_burst")]
154    pub rate_limit_burst: u32,
155    /// Maximum request body size in bytes (default 10MB).
156    #[serde(default = "default_max_body_size")]
157    pub max_body_size: usize,
158    /// Trust X-Forwarded-For header for client IP detection (e.g., behind a reverse proxy).
159    /// When false (default), only the direct connection IP is used for rate limiting.
160    /// Enable only when the server is behind a trusted reverse proxy.
161    #[serde(default)]
162    pub trust_proxy_headers: bool,
163}
164
165impl Default for SecurityConfig {
166    fn default() -> Self {
167        Self {
168            cors_origins: Vec::new(),
169            rate_limit_rps: default_rate_limit_rps(),
170            rate_limit_burst: default_rate_limit_burst(),
171            max_body_size: default_max_body_size(),
172            trust_proxy_headers: false,
173        }
174    }
175}
176
177fn default_rate_limit_rps() -> u32 {
178    100
179}
180
181fn default_rate_limit_burst() -> u32 {
182    200
183}
184
185fn default_max_body_size() -> usize {
186    10 * 1024 * 1024 // 10MB
187}
188
189/// TLS configuration for HTTPS support.
190#[derive(Debug, Clone, Serialize, Deserialize)]
191pub struct TlsConfig {
192    pub cert_path: PathBuf,
193    pub key_path: PathBuf,
194}
195
196/// Identity of this appliance in a cluster.
197#[derive(Debug, Clone, Serialize, Deserialize)]
198pub struct ApplianceIdentity {
199    pub name: String,
200    pub mgmt_url: String,
201    pub retrieval_url: String,
202    pub engine_url: String,
203    pub etl_url: String,
204}
205
206/// A remote peer appliance.
207#[derive(Debug, Clone, Serialize, Deserialize)]
208pub struct PeerConfig {
209    pub name: String,
210    pub mgmt_url: String,
211    pub retrieval_url: String,
212    /// Per-peer outbound credential. When this appliance sends proxied requests
213    /// to this peer, it uses this key instead of the cluster-level `api_key`.
214    #[serde(default)]
215    pub api_key: Option<String>,
216}
217
218/// Cluster configuration for multi-appliance mode.
219#[derive(Debug, Clone, Serialize, Deserialize)]
220pub struct ClusterConfig {
221    pub identity: ApplianceIdentity,
222    #[serde(default = "default_cache_ttl")]
223    pub cache_ttl_secs: u64,
224    #[serde(default = "default_peer_timeout")]
225    pub peer_timeout_secs: u64,
226    #[serde(default)]
227    pub peers: Vec<PeerConfig>,
228    /// Shared secret for inter-peer authentication. Used as the outbound credential
229    /// for any peer that does not have its own `api_key` in `[[cluster.peers]]`.
230    /// Also serves as the inbound key this appliance accepts from peers.
231    #[serde(default)]
232    pub api_key: Option<String>,
233    /// Java parity (59f0758): explicit opt-in for the destructive
234    /// `reassignAppliance` live-migration endpoint. Default `false` so
235    /// having cluster mode enabled doesn't on its own permit the
236    /// migration — operators must validate destination data stores
237    /// before flipping this on.
238    #[serde(default)]
239    pub reassign_appliance_enabled: bool,
240}
241
242fn default_cache_ttl() -> u64 {
243    300
244}
245
246fn default_peer_timeout() -> u64 {
247    30
248}
249
250/// Failover retrieval configuration.
251///
252/// `peers` is a list of external archiver URLs serving the same Java-style
253/// retrieval endpoint (`/retrieval/data/getData.raw`). At query time, the
254/// archiver fetches the same `pv` + time range from each peer and merges
255/// the results with the local stream, dropping samples with duplicate
256/// timestamps.
257#[derive(Debug, Clone, Serialize, Deserialize)]
258pub struct FailoverConfig {
259    /// Per-peer retrieval base URLs (e.g. `https://archiver-b.example/retrieval`).
260    /// `getData.raw` is appended automatically.
261    pub peers: Vec<String>,
262    /// HTTP timeout per peer fetch (seconds).
263    #[serde(default = "default_failover_timeout")]
264    pub timeout_secs: u64,
265}
266
267fn default_failover_timeout() -> u64 {
268    30
269}
270
271/// PVA retrieval RPC server configuration.
272#[derive(Debug, Clone, Serialize, Deserialize)]
273pub struct PvaConfig {
274    /// TCP port the PVA server listens on (default 5075).
275    #[serde(default = "default_pva_tcp_port")]
276    pub tcp_port: u16,
277    /// UDP port for PVA search/beacon (default 5076).
278    #[serde(default = "default_pva_udp_port")]
279    pub udp_port: u16,
280}
281
282fn default_pva_tcp_port() -> u16 {
283    5075
284}
285
286fn default_pva_udp_port() -> u16 {
287    5076
288}
289
290impl Default for PvaConfig {
291    fn default() -> Self {
292        Self {
293            tcp_port: default_pva_tcp_port(),
294            udp_port: default_pva_udp_port(),
295        }
296    }
297}
298
299impl ArchiverConfig {
300    pub fn from_toml(s: &str) -> Result<Self, toml::de::Error> {
301        toml::from_str(s)
302    }
303
304    /// Validate configuration values that TOML deserialization alone cannot check.
305    pub fn validate(&self) -> anyhow::Result<()> {
306        if self.engine.write_period_secs == 0 {
307            // The write_loop drives flushes via tokio::time::interval,
308            // which panics on Duration::ZERO. The legacy elapsed-check
309            // tolerated 0 but the ticker path doesn't, so reject it
310            // at config load instead of letting the engine crash on
311            // first start. 1s is the smallest sensible value (sub-
312            // second flush rates serve no archiving purpose).
313            anyhow::bail!("engine.write_period_secs must be > 0");
314        }
315        if self.engine.write_shards == 0 {
316            anyhow::bail!("engine.write_shards must be > 0 (use 1 for the legacy single-worker layout)");
317        }
318        if self.engine.per_shard_buffer == 0 && self.engine.write_shards > 1 {
319            anyhow::bail!(
320                "engine.per_shard_buffer must be > 0 when write_shards > 1; \
321                 a 0-capacity shard channel would drop every sample"
322            );
323        }
324        for (name, tier) in [
325            ("sts", &self.storage.sts),
326            ("mts", &self.storage.mts),
327            ("lts", &self.storage.lts),
328        ] {
329            if tier.gather >= tier.hold {
330                anyhow::bail!(
331                    "{name}: gather ({}) must be less than hold ({})",
332                    tier.gather,
333                    tier.hold,
334                );
335            }
336        }
337        if let Some(ref cluster) = self.cluster {
338            if cluster.peer_timeout_secs == 0 {
339                anyhow::bail!("cluster.peer_timeout_secs must be > 0");
340            }
341            if cluster.cache_ttl_secs == 0 {
342                anyhow::bail!("cluster.cache_ttl_secs must be > 0");
343            }
344            // When external API keys are enabled, each peer must have an outbound
345            // credential — either its own `api_key` or the cluster-level fallback.
346            if self.api_keys.is_some() && !cluster.peers.is_empty() {
347                let has_fallback = cluster.api_key.is_some();
348                for (i, peer) in cluster.peers.iter().enumerate() {
349                    if peer.api_key.is_none() && !has_fallback {
350                        anyhow::bail!(
351                            "cluster.peers[{i}] ({}) has no api_key and no cluster.api_key fallback; \
352                             proxied write requests to this peer will be rejected",
353                            peer.name
354                        );
355                    }
356                }
357            }
358            for (i, peer) in cluster.peers.iter().enumerate() {
359                if !peer.mgmt_url.starts_with("http://") && !peer.mgmt_url.starts_with("https://") {
360                    anyhow::bail!(
361                        "cluster.peers[{i}].mgmt_url must start with http:// or https://"
362                    );
363                }
364                if !peer.retrieval_url.starts_with("http://")
365                    && !peer.retrieval_url.starts_with("https://")
366                {
367                    anyhow::bail!(
368                        "cluster.peers[{i}].retrieval_url must start with http:// or https://"
369                    );
370                }
371            }
372        }
373        Ok(())
374    }
375}
376
377#[cfg(test)]
378mod tests {
379    use super::*;
380
381    #[test]
382    fn parse_config_without_cluster() {
383        let toml = r#"
384[storage.sts]
385root_folder = "/tmp/sts"
386partition_granularity = "hour"
387
388[storage.mts]
389root_folder = "/tmp/mts"
390partition_granularity = "day"
391
392[storage.lts]
393root_folder = "/tmp/lts"
394partition_granularity = "year"
395"#;
396        let config = ArchiverConfig::from_toml(toml).unwrap();
397        assert!(config.cluster.is_none());
398    }
399
400    #[test]
401    fn parse_config_with_cluster() {
402        let toml = r#"
403[storage.sts]
404root_folder = "/tmp/sts"
405partition_granularity = "hour"
406
407[storage.mts]
408root_folder = "/tmp/mts"
409partition_granularity = "day"
410
411[storage.lts]
412root_folder = "/tmp/lts"
413partition_granularity = "year"
414
415[cluster.identity]
416name = "appliance0"
417mgmt_url = "http://host0:17665/mgmt/bpl"
418retrieval_url = "http://host0:17665/retrieval"
419engine_url = "http://host0:17665"
420etl_url = "http://host0:17665"
421
422[[cluster.peers]]
423name = "appliance1"
424mgmt_url = "http://host1:17665/mgmt/bpl"
425retrieval_url = "http://host1:17665/retrieval"
426"#;
427        let config = ArchiverConfig::from_toml(toml).unwrap();
428        let cluster = config.cluster.unwrap();
429        assert_eq!(cluster.identity.name, "appliance0");
430        assert_eq!(cluster.peers.len(), 1);
431        assert_eq!(cluster.peers[0].name, "appliance1");
432        assert_eq!(cluster.cache_ttl_secs, 300);
433        assert_eq!(cluster.peer_timeout_secs, 30);
434    }
435
436    #[test]
437    fn validate_cluster_api_key_required_with_api_keys() {
438        let toml = r#"
439api_keys = ["secret"]
440
441[storage.sts]
442root_folder = "/tmp/sts"
443partition_granularity = "hour"
444
445[storage.mts]
446root_folder = "/tmp/mts"
447partition_granularity = "day"
448
449[storage.lts]
450root_folder = "/tmp/lts"
451partition_granularity = "year"
452
453[cluster.identity]
454name = "appliance0"
455mgmt_url = "http://host0:17665/mgmt/bpl"
456retrieval_url = "http://host0:17665/retrieval"
457engine_url = "http://host0:17665"
458etl_url = "http://host0:17665"
459
460[[cluster.peers]]
461name = "appliance1"
462mgmt_url = "http://host1:17665/mgmt/bpl"
463retrieval_url = "http://host1:17665/retrieval"
464"#;
465        let config = ArchiverConfig::from_toml(toml).unwrap();
466        let err = config.validate().unwrap_err();
467        assert!(
468            err.to_string()
469                .contains("has no api_key and no cluster.api_key fallback")
470        );
471    }
472
473    #[test]
474    fn validate_cluster_api_key_not_required_without_api_keys() {
475        let toml = r#"
476[storage.sts]
477root_folder = "/tmp/sts"
478partition_granularity = "hour"
479
480[storage.mts]
481root_folder = "/tmp/mts"
482partition_granularity = "day"
483
484[storage.lts]
485root_folder = "/tmp/lts"
486partition_granularity = "year"
487
488[cluster.identity]
489name = "appliance0"
490mgmt_url = "http://host0:17665/mgmt/bpl"
491retrieval_url = "http://host0:17665/retrieval"
492engine_url = "http://host0:17665"
493etl_url = "http://host0:17665"
494
495[[cluster.peers]]
496name = "appliance1"
497mgmt_url = "http://host1:17665/mgmt/bpl"
498retrieval_url = "http://host1:17665/retrieval"
499"#;
500        let config = ArchiverConfig::from_toml(toml).unwrap();
501        config.validate().unwrap(); // No api_keys → no requirement for cluster.api_key
502    }
503
504    #[test]
505    fn validate_per_peer_keys_without_fallback() {
506        // Each peer has its own api_key → passes even without cluster.api_key.
507        let toml = r#"
508api_keys = ["secret"]
509
510[storage.sts]
511root_folder = "/tmp/sts"
512partition_granularity = "hour"
513
514[storage.mts]
515root_folder = "/tmp/mts"
516partition_granularity = "day"
517
518[storage.lts]
519root_folder = "/tmp/lts"
520partition_granularity = "year"
521
522[cluster.identity]
523name = "appliance0"
524mgmt_url = "http://host0:17665/mgmt/bpl"
525retrieval_url = "http://host0:17665/retrieval"
526engine_url = "http://host0:17665"
527etl_url = "http://host0:17665"
528
529[[cluster.peers]]
530name = "appliance1"
531mgmt_url = "http://host1:17665/mgmt/bpl"
532retrieval_url = "http://host1:17665/retrieval"
533api_key = "peer1-key"
534"#;
535        let config = ArchiverConfig::from_toml(toml).unwrap();
536        config.validate().unwrap();
537    }
538
539    #[test]
540    fn validate_mixed_per_peer_and_fallback() {
541        // One peer has its own key, another relies on the fallback → passes.
542        let toml = r#"
543api_keys = ["secret"]
544
545[storage.sts]
546root_folder = "/tmp/sts"
547partition_granularity = "hour"
548
549[storage.mts]
550root_folder = "/tmp/mts"
551partition_granularity = "day"
552
553[storage.lts]
554root_folder = "/tmp/lts"
555partition_granularity = "year"
556
557[cluster]
558api_key = "shared-fallback"
559
560[cluster.identity]
561name = "appliance0"
562mgmt_url = "http://host0:17665/mgmt/bpl"
563retrieval_url = "http://host0:17665/retrieval"
564engine_url = "http://host0:17665"
565etl_url = "http://host0:17665"
566
567[[cluster.peers]]
568name = "appliance1"
569mgmt_url = "http://host1:17665/mgmt/bpl"
570retrieval_url = "http://host1:17665/retrieval"
571api_key = "peer1-specific"
572
573[[cluster.peers]]
574name = "appliance2"
575mgmt_url = "http://host2:17665/mgmt/bpl"
576retrieval_url = "http://host2:17665/retrieval"
577"#;
578        let config = ArchiverConfig::from_toml(toml).unwrap();
579        config.validate().unwrap();
580    }
581
582    #[test]
583    fn parse_peer_api_key_from_toml() {
584        let toml = r#"
585[storage.sts]
586root_folder = "/tmp/sts"
587partition_granularity = "hour"
588
589[storage.mts]
590root_folder = "/tmp/mts"
591partition_granularity = "day"
592
593[storage.lts]
594root_folder = "/tmp/lts"
595partition_granularity = "year"
596
597[cluster.identity]
598name = "appliance0"
599mgmt_url = "http://host0:17665/mgmt/bpl"
600retrieval_url = "http://host0:17665/retrieval"
601engine_url = "http://host0:17665"
602etl_url = "http://host0:17665"
603
604[[cluster.peers]]
605name = "appliance1"
606mgmt_url = "http://host1:17665/mgmt/bpl"
607retrieval_url = "http://host1:17665/retrieval"
608api_key = "peer1-secret"
609
610[[cluster.peers]]
611name = "appliance2"
612mgmt_url = "http://host2:17665/mgmt/bpl"
613retrieval_url = "http://host2:17665/retrieval"
614"#;
615        let config = ArchiverConfig::from_toml(toml).unwrap();
616        let cluster = config.cluster.unwrap();
617        assert_eq!(cluster.peers[0].api_key.as_deref(), Some("peer1-secret"));
618        assert_eq!(cluster.peers[1].api_key, None);
619    }
620}