Skip to main content

archiver_core/
config.rs

1use std::path::PathBuf;
2
3use serde::{Deserialize, Serialize};
4
5use crate::storage::partition::PartitionGranularity;
6
7/// Top-level archiver configuration (TOML-based).
8#[derive(Debug, Clone, Serialize, Deserialize)]
9pub struct ArchiverConfig {
10    #[serde(default = "default_listen_addr")]
11    pub listen_addr: String,
12    #[serde(default = "default_listen_port")]
13    pub listen_port: u16,
14    pub storage: StorageConfig,
15    #[serde(default)]
16    pub engine: EngineConfig,
17
18    #[serde(default)]
19    pub cluster: Option<ClusterConfig>,
20    /// Optional list of external archivers used for failover-merged retrieval.
21    /// When set, retrieval handlers fetch from each peer in addition to local
22    /// data and merge by timestamp (with duplicate-timestamp drop).
23    #[serde(default)]
24    pub failover: Option<FailoverConfig>,
25    /// PVA retrieval RPC server. When set, the archiver hosts
26    /// `archappl/getData` and `archappl/getDataAtTime` PVA RPC PVs.
27    #[serde(default)]
28    pub pva: Option<PvaConfig>,
29    /// Optional API keys for management endpoint authentication.
30    /// If set, mgmt write endpoints require `Authorization: Bearer <key>` or `X-API-Key: <key>`.
31    /// Retrieval GET endpoints remain open.
32    #[serde(default)]
33    pub api_keys: Option<Vec<String>>,
34    /// Security settings (CORS, rate limiting, body limits).
35    #[serde(default)]
36    pub security: SecurityConfig,
37    /// Optional TLS configuration for HTTPS.
38    #[serde(default)]
39    pub tls: Option<TlsConfig>,
40}
41
42fn default_listen_addr() -> String {
43    "0.0.0.0".to_string()
44}
45
46fn default_listen_port() -> u16 {
47    17665
48}
49
50/// 3-tier storage configuration.
51#[derive(Debug, Clone, Serialize, Deserialize)]
52pub struct StorageConfig {
53    pub sts: TierConfig,
54    pub mts: TierConfig,
55    pub lts: TierConfig,
56}
57
58#[derive(Debug, Clone, Serialize, Deserialize)]
59pub struct TierConfig {
60    pub root_folder: PathBuf,
61    pub partition_granularity: PartitionGranularity,
62    /// Number of partitions to hold before ETL moves data out.
63    #[serde(default = "default_hold")]
64    pub hold: u32,
65    /// Number of partitions to gather (move out) at once.
66    #[serde(default = "default_gather")]
67    pub gather: u32,
68}
69
70fn default_hold() -> u32 {
71    5
72}
73
74fn default_gather() -> u32 {
75    3
76}
77
78/// EPICS CA engine configuration.
79#[derive(Debug, Clone, Default, Serialize, Deserialize)]
80pub struct EngineConfig {
81    /// Write period in seconds — how often buffered samples flush to storage.
82    #[serde(default = "default_write_period")]
83    pub write_period_secs: u64,
84    /// Path to PV policy TOML file.
85    pub policy_file: Option<PathBuf>,
86    /// Maximum allowed drift between IOC-reported sample timestamps and
87    /// the appliance's wall clock, in either direction (Java parity
88    /// 6538631 — `org.epics.archiverappliance.engine.epics.SERVER_IOC_DRIFT_SECONDS`).
89    /// Default 30 minutes; set higher for known-skewed sites without
90    /// recompiling.
91    #[serde(default = "default_server_ioc_drift_secs")]
92    pub server_ioc_drift_secs: u64,
93}
94
95fn default_write_period() -> u64 {
96    10
97}
98
99fn default_server_ioc_drift_secs() -> u64 {
100    30 * 60
101}
102
103/// Security configuration.
104#[derive(Debug, Clone, Serialize, Deserialize)]
105pub struct SecurityConfig {
106    /// CORS allowed origins. Empty = same-origin only (strict).
107    #[serde(default)]
108    pub cors_origins: Vec<String>,
109    /// Rate limit: requests per second per IP (0 = disabled).
110    #[serde(default = "default_rate_limit_rps")]
111    pub rate_limit_rps: u32,
112    /// Rate limit burst size.
113    #[serde(default = "default_rate_limit_burst")]
114    pub rate_limit_burst: u32,
115    /// Maximum request body size in bytes (default 10MB).
116    #[serde(default = "default_max_body_size")]
117    pub max_body_size: usize,
118    /// Trust X-Forwarded-For header for client IP detection (e.g., behind a reverse proxy).
119    /// When false (default), only the direct connection IP is used for rate limiting.
120    /// Enable only when the server is behind a trusted reverse proxy.
121    #[serde(default)]
122    pub trust_proxy_headers: bool,
123}
124
125impl Default for SecurityConfig {
126    fn default() -> Self {
127        Self {
128            cors_origins: Vec::new(),
129            rate_limit_rps: default_rate_limit_rps(),
130            rate_limit_burst: default_rate_limit_burst(),
131            max_body_size: default_max_body_size(),
132            trust_proxy_headers: false,
133        }
134    }
135}
136
137fn default_rate_limit_rps() -> u32 {
138    100
139}
140
141fn default_rate_limit_burst() -> u32 {
142    200
143}
144
145fn default_max_body_size() -> usize {
146    10 * 1024 * 1024 // 10MB
147}
148
149/// TLS configuration for HTTPS support.
150#[derive(Debug, Clone, Serialize, Deserialize)]
151pub struct TlsConfig {
152    pub cert_path: PathBuf,
153    pub key_path: PathBuf,
154}
155
156/// Identity of this appliance in a cluster.
157#[derive(Debug, Clone, Serialize, Deserialize)]
158pub struct ApplianceIdentity {
159    pub name: String,
160    pub mgmt_url: String,
161    pub retrieval_url: String,
162    pub engine_url: String,
163    pub etl_url: String,
164}
165
166/// A remote peer appliance.
167#[derive(Debug, Clone, Serialize, Deserialize)]
168pub struct PeerConfig {
169    pub name: String,
170    pub mgmt_url: String,
171    pub retrieval_url: String,
172    /// Per-peer outbound credential. When this appliance sends proxied requests
173    /// to this peer, it uses this key instead of the cluster-level `api_key`.
174    #[serde(default)]
175    pub api_key: Option<String>,
176}
177
178/// Cluster configuration for multi-appliance mode.
179#[derive(Debug, Clone, Serialize, Deserialize)]
180pub struct ClusterConfig {
181    pub identity: ApplianceIdentity,
182    #[serde(default = "default_cache_ttl")]
183    pub cache_ttl_secs: u64,
184    #[serde(default = "default_peer_timeout")]
185    pub peer_timeout_secs: u64,
186    #[serde(default)]
187    pub peers: Vec<PeerConfig>,
188    /// Shared secret for inter-peer authentication. Used as the outbound credential
189    /// for any peer that does not have its own `api_key` in `[[cluster.peers]]`.
190    /// Also serves as the inbound key this appliance accepts from peers.
191    #[serde(default)]
192    pub api_key: Option<String>,
193    /// Java parity (59f0758): explicit opt-in for the destructive
194    /// `reassignAppliance` live-migration endpoint. Default `false` so
195    /// having cluster mode enabled doesn't on its own permit the
196    /// migration — operators must validate destination data stores
197    /// before flipping this on.
198    #[serde(default)]
199    pub reassign_appliance_enabled: bool,
200}
201
202fn default_cache_ttl() -> u64 {
203    300
204}
205
206fn default_peer_timeout() -> u64 {
207    30
208}
209
210/// Failover retrieval configuration.
211///
212/// `peers` is a list of external archiver URLs serving the same Java-style
213/// retrieval endpoint (`/retrieval/data/getData.raw`). At query time, the
214/// archiver fetches the same `pv` + time range from each peer and merges
215/// the results with the local stream, dropping samples with duplicate
216/// timestamps.
217#[derive(Debug, Clone, Serialize, Deserialize)]
218pub struct FailoverConfig {
219    /// Per-peer retrieval base URLs (e.g. `https://archiver-b.example/retrieval`).
220    /// `getData.raw` is appended automatically.
221    pub peers: Vec<String>,
222    /// HTTP timeout per peer fetch (seconds).
223    #[serde(default = "default_failover_timeout")]
224    pub timeout_secs: u64,
225}
226
227fn default_failover_timeout() -> u64 {
228    30
229}
230
231/// PVA retrieval RPC server configuration.
232#[derive(Debug, Clone, Serialize, Deserialize)]
233pub struct PvaConfig {
234    /// TCP port the PVA server listens on (default 5075).
235    #[serde(default = "default_pva_tcp_port")]
236    pub tcp_port: u16,
237    /// UDP port for PVA search/beacon (default 5076).
238    #[serde(default = "default_pva_udp_port")]
239    pub udp_port: u16,
240}
241
242fn default_pva_tcp_port() -> u16 {
243    5075
244}
245
246fn default_pva_udp_port() -> u16 {
247    5076
248}
249
250impl Default for PvaConfig {
251    fn default() -> Self {
252        Self {
253            tcp_port: default_pva_tcp_port(),
254            udp_port: default_pva_udp_port(),
255        }
256    }
257}
258
259impl ArchiverConfig {
260    pub fn from_toml(s: &str) -> Result<Self, toml::de::Error> {
261        toml::from_str(s)
262    }
263
264    /// Validate configuration values that TOML deserialization alone cannot check.
265    pub fn validate(&self) -> anyhow::Result<()> {
266        for (name, tier) in [
267            ("sts", &self.storage.sts),
268            ("mts", &self.storage.mts),
269            ("lts", &self.storage.lts),
270        ] {
271            if tier.gather >= tier.hold {
272                anyhow::bail!(
273                    "{name}: gather ({}) must be less than hold ({})",
274                    tier.gather,
275                    tier.hold,
276                );
277            }
278        }
279        if let Some(ref cluster) = self.cluster {
280            if cluster.peer_timeout_secs == 0 {
281                anyhow::bail!("cluster.peer_timeout_secs must be > 0");
282            }
283            if cluster.cache_ttl_secs == 0 {
284                anyhow::bail!("cluster.cache_ttl_secs must be > 0");
285            }
286            // When external API keys are enabled, each peer must have an outbound
287            // credential — either its own `api_key` or the cluster-level fallback.
288            if self.api_keys.is_some() && !cluster.peers.is_empty() {
289                let has_fallback = cluster.api_key.is_some();
290                for (i, peer) in cluster.peers.iter().enumerate() {
291                    if peer.api_key.is_none() && !has_fallback {
292                        anyhow::bail!(
293                            "cluster.peers[{i}] ({}) has no api_key and no cluster.api_key fallback; \
294                             proxied write requests to this peer will be rejected",
295                            peer.name
296                        );
297                    }
298                }
299            }
300            for (i, peer) in cluster.peers.iter().enumerate() {
301                if !peer.mgmt_url.starts_with("http://") && !peer.mgmt_url.starts_with("https://") {
302                    anyhow::bail!(
303                        "cluster.peers[{i}].mgmt_url must start with http:// or https://"
304                    );
305                }
306                if !peer.retrieval_url.starts_with("http://")
307                    && !peer.retrieval_url.starts_with("https://")
308                {
309                    anyhow::bail!(
310                        "cluster.peers[{i}].retrieval_url must start with http:// or https://"
311                    );
312                }
313            }
314        }
315        Ok(())
316    }
317}
318
319#[cfg(test)]
320mod tests {
321    use super::*;
322
323    #[test]
324    fn parse_config_without_cluster() {
325        let toml = r#"
326[storage.sts]
327root_folder = "/tmp/sts"
328partition_granularity = "hour"
329
330[storage.mts]
331root_folder = "/tmp/mts"
332partition_granularity = "day"
333
334[storage.lts]
335root_folder = "/tmp/lts"
336partition_granularity = "year"
337"#;
338        let config = ArchiverConfig::from_toml(toml).unwrap();
339        assert!(config.cluster.is_none());
340    }
341
342    #[test]
343    fn parse_config_with_cluster() {
344        let toml = r#"
345[storage.sts]
346root_folder = "/tmp/sts"
347partition_granularity = "hour"
348
349[storage.mts]
350root_folder = "/tmp/mts"
351partition_granularity = "day"
352
353[storage.lts]
354root_folder = "/tmp/lts"
355partition_granularity = "year"
356
357[cluster.identity]
358name = "appliance0"
359mgmt_url = "http://host0:17665/mgmt/bpl"
360retrieval_url = "http://host0:17665/retrieval"
361engine_url = "http://host0:17665"
362etl_url = "http://host0:17665"
363
364[[cluster.peers]]
365name = "appliance1"
366mgmt_url = "http://host1:17665/mgmt/bpl"
367retrieval_url = "http://host1:17665/retrieval"
368"#;
369        let config = ArchiverConfig::from_toml(toml).unwrap();
370        let cluster = config.cluster.unwrap();
371        assert_eq!(cluster.identity.name, "appliance0");
372        assert_eq!(cluster.peers.len(), 1);
373        assert_eq!(cluster.peers[0].name, "appliance1");
374        assert_eq!(cluster.cache_ttl_secs, 300);
375        assert_eq!(cluster.peer_timeout_secs, 30);
376    }
377
378    #[test]
379    fn validate_cluster_api_key_required_with_api_keys() {
380        let toml = r#"
381api_keys = ["secret"]
382
383[storage.sts]
384root_folder = "/tmp/sts"
385partition_granularity = "hour"
386
387[storage.mts]
388root_folder = "/tmp/mts"
389partition_granularity = "day"
390
391[storage.lts]
392root_folder = "/tmp/lts"
393partition_granularity = "year"
394
395[cluster.identity]
396name = "appliance0"
397mgmt_url = "http://host0:17665/mgmt/bpl"
398retrieval_url = "http://host0:17665/retrieval"
399engine_url = "http://host0:17665"
400etl_url = "http://host0:17665"
401
402[[cluster.peers]]
403name = "appliance1"
404mgmt_url = "http://host1:17665/mgmt/bpl"
405retrieval_url = "http://host1:17665/retrieval"
406"#;
407        let config = ArchiverConfig::from_toml(toml).unwrap();
408        let err = config.validate().unwrap_err();
409        assert!(
410            err.to_string()
411                .contains("has no api_key and no cluster.api_key fallback")
412        );
413    }
414
415    #[test]
416    fn validate_cluster_api_key_not_required_without_api_keys() {
417        let toml = r#"
418[storage.sts]
419root_folder = "/tmp/sts"
420partition_granularity = "hour"
421
422[storage.mts]
423root_folder = "/tmp/mts"
424partition_granularity = "day"
425
426[storage.lts]
427root_folder = "/tmp/lts"
428partition_granularity = "year"
429
430[cluster.identity]
431name = "appliance0"
432mgmt_url = "http://host0:17665/mgmt/bpl"
433retrieval_url = "http://host0:17665/retrieval"
434engine_url = "http://host0:17665"
435etl_url = "http://host0:17665"
436
437[[cluster.peers]]
438name = "appliance1"
439mgmt_url = "http://host1:17665/mgmt/bpl"
440retrieval_url = "http://host1:17665/retrieval"
441"#;
442        let config = ArchiverConfig::from_toml(toml).unwrap();
443        config.validate().unwrap(); // No api_keys → no requirement for cluster.api_key
444    }
445
446    #[test]
447    fn validate_per_peer_keys_without_fallback() {
448        // Each peer has its own api_key → passes even without cluster.api_key.
449        let toml = r#"
450api_keys = ["secret"]
451
452[storage.sts]
453root_folder = "/tmp/sts"
454partition_granularity = "hour"
455
456[storage.mts]
457root_folder = "/tmp/mts"
458partition_granularity = "day"
459
460[storage.lts]
461root_folder = "/tmp/lts"
462partition_granularity = "year"
463
464[cluster.identity]
465name = "appliance0"
466mgmt_url = "http://host0:17665/mgmt/bpl"
467retrieval_url = "http://host0:17665/retrieval"
468engine_url = "http://host0:17665"
469etl_url = "http://host0:17665"
470
471[[cluster.peers]]
472name = "appliance1"
473mgmt_url = "http://host1:17665/mgmt/bpl"
474retrieval_url = "http://host1:17665/retrieval"
475api_key = "peer1-key"
476"#;
477        let config = ArchiverConfig::from_toml(toml).unwrap();
478        config.validate().unwrap();
479    }
480
481    #[test]
482    fn validate_mixed_per_peer_and_fallback() {
483        // One peer has its own key, another relies on the fallback → passes.
484        let toml = r#"
485api_keys = ["secret"]
486
487[storage.sts]
488root_folder = "/tmp/sts"
489partition_granularity = "hour"
490
491[storage.mts]
492root_folder = "/tmp/mts"
493partition_granularity = "day"
494
495[storage.lts]
496root_folder = "/tmp/lts"
497partition_granularity = "year"
498
499[cluster]
500api_key = "shared-fallback"
501
502[cluster.identity]
503name = "appliance0"
504mgmt_url = "http://host0:17665/mgmt/bpl"
505retrieval_url = "http://host0:17665/retrieval"
506engine_url = "http://host0:17665"
507etl_url = "http://host0:17665"
508
509[[cluster.peers]]
510name = "appliance1"
511mgmt_url = "http://host1:17665/mgmt/bpl"
512retrieval_url = "http://host1:17665/retrieval"
513api_key = "peer1-specific"
514
515[[cluster.peers]]
516name = "appliance2"
517mgmt_url = "http://host2:17665/mgmt/bpl"
518retrieval_url = "http://host2:17665/retrieval"
519"#;
520        let config = ArchiverConfig::from_toml(toml).unwrap();
521        config.validate().unwrap();
522    }
523
524    #[test]
525    fn parse_peer_api_key_from_toml() {
526        let toml = r#"
527[storage.sts]
528root_folder = "/tmp/sts"
529partition_granularity = "hour"
530
531[storage.mts]
532root_folder = "/tmp/mts"
533partition_granularity = "day"
534
535[storage.lts]
536root_folder = "/tmp/lts"
537partition_granularity = "year"
538
539[cluster.identity]
540name = "appliance0"
541mgmt_url = "http://host0:17665/mgmt/bpl"
542retrieval_url = "http://host0:17665/retrieval"
543engine_url = "http://host0:17665"
544etl_url = "http://host0:17665"
545
546[[cluster.peers]]
547name = "appliance1"
548mgmt_url = "http://host1:17665/mgmt/bpl"
549retrieval_url = "http://host1:17665/retrieval"
550api_key = "peer1-secret"
551
552[[cluster.peers]]
553name = "appliance2"
554mgmt_url = "http://host2:17665/mgmt/bpl"
555retrieval_url = "http://host2:17665/retrieval"
556"#;
557        let config = ArchiverConfig::from_toml(toml).unwrap();
558        let cluster = config.cluster.unwrap();
559        assert_eq!(cluster.peers[0].api_key.as_deref(), Some("peer1-secret"));
560        assert_eq!(cluster.peers[1].api_key, None);
561    }
562}