Skip to main content

archiver_core/
config.rs

1use std::path::PathBuf;
2
3use serde::{Deserialize, Serialize};
4
5use crate::storage::partition::PartitionGranularity;
6
7/// Top-level archiver configuration (TOML-based).
8#[derive(Debug, Clone, Serialize, Deserialize)]
9pub struct ArchiverConfig {
10    #[serde(default = "default_listen_addr")]
11    pub listen_addr: String,
12    #[serde(default = "default_listen_port")]
13    pub listen_port: u16,
14    pub storage: StorageConfig,
15    #[serde(default)]
16    pub engine: EngineConfig,
17
18    #[serde(default)]
19    pub cluster: Option<ClusterConfig>,
20    /// Optional list of external archivers used for failover-merged retrieval.
21    /// When set, retrieval handlers fetch from each peer in addition to local
22    /// data and merge by timestamp (with duplicate-timestamp drop).
23    #[serde(default)]
24    pub failover: Option<FailoverConfig>,
25    /// PVA retrieval RPC server. When set, the archiver hosts
26    /// `archappl/getData` and `archappl/getDataAtTime` PVA RPC PVs.
27    #[serde(default)]
28    pub pva: Option<PvaConfig>,
29    /// Optional API keys for management endpoint authentication.
30    /// If set, mgmt write endpoints require `Authorization: Bearer <key>` or `X-API-Key: <key>`.
31    /// Retrieval GET endpoints remain open.
32    #[serde(default)]
33    pub api_keys: Option<Vec<String>>,
34    /// Security settings (CORS, rate limiting, body limits).
35    #[serde(default)]
36    pub security: SecurityConfig,
37    /// Optional TLS configuration for HTTPS.
38    #[serde(default)]
39    pub tls: Option<TlsConfig>,
40}
41
42fn default_listen_addr() -> String {
43    "0.0.0.0".to_string()
44}
45
46fn default_listen_port() -> u16 {
47    17665
48}
49
50/// 3-tier storage configuration.
51#[derive(Debug, Clone, Serialize, Deserialize)]
52pub struct StorageConfig {
53    pub sts: TierConfig,
54    pub mts: TierConfig,
55    pub lts: TierConfig,
56    /// Process-wide cap on open `BufWriter` file handles, shared
57    /// across STS / MTS / LTS. When `Some`, every tier's
58    /// PlainPbStoragePlugin draws from the SAME [`FdBudget`] —
59    /// total open writers across all 3 tiers stays at-or-below
60    /// this number, so the process cannot summed-overflow its
61    /// `ulimit -n` even when each tier is busy. When `None`, each
62    /// tier keeps its own per-tier cap (legacy behavior).
63    #[serde(default)]
64    pub max_open_writers_total: Option<usize>,
65}
66
67#[derive(Debug, Clone, Serialize, Deserialize)]
68pub struct TierConfig {
69    pub root_folder: PathBuf,
70    pub partition_granularity: PartitionGranularity,
71    /// Number of partitions to hold before ETL moves data out.
72    #[serde(default = "default_hold")]
73    pub hold: u32,
74    /// Number of partitions to gather (move out) at once.
75    #[serde(default = "default_gather")]
76    pub gather: u32,
77    /// Per-tier cap on open `BufWriter` file handles. Only
78    /// consulted when [`StorageConfig::max_open_writers_total`] is
79    /// `None`; otherwise all tiers share the global budget. `0`
80    /// disables the cap (lifts to `usize::MAX`); `None` falls back
81    /// to a tier-appropriate default (STS: 512, MTS/LTS: 64 — STS
82    /// gets the bulk because it's the live ingest path).
83    #[serde(default)]
84    pub max_open_writers: Option<usize>,
85}
86
87fn default_hold() -> u32 {
88    5
89}
90
91fn default_gather() -> u32 {
92    3
93}
94
95/// EPICS CA engine configuration.
96#[derive(Debug, Clone, Serialize, Deserialize)]
97pub struct EngineConfig {
98    /// Write period in seconds — how often buffered samples flush to storage.
99    #[serde(default = "default_write_period")]
100    pub write_period_secs: u64,
101    /// Path to PV policy TOML file.
102    pub policy_file: Option<PathBuf>,
103    /// Maximum allowed drift between IOC-reported sample timestamps and
104    /// the appliance's wall clock, in either direction (Java parity
105    /// 6538631 — `org.epics.archiverappliance.engine.epics.SERVER_IOC_DRIFT_SECONDS`).
106    /// Default 30 minutes; set higher for known-skewed sites without
107    /// recompiling.
108    #[serde(default = "default_server_ioc_drift_secs")]
109    pub server_ioc_drift_secs: u64,
110    /// Number of parallel write-loop shards. `1` (the default)
111    /// keeps the legacy single-worker layout. Sites with many
112    /// active PVs and a fast STS can raise this to e.g. 4–16; the
113    /// engine spawns a dispatcher that hashes `pv_name` to a fixed
114    /// shard so per-PV ordering is preserved while different PVs
115    /// can append concurrently.
116    #[serde(default = "default_write_shards")]
117    pub write_shards: usize,
118    /// Per-shard mpsc capacity. Only consulted when
119    /// `write_shards > 1`. The dispatcher `try_send`s into each
120    /// shard channel — when a shard is saturated its overflow is
121    /// dropped and recorded on the per-PV `buffer_overflow_drops`
122    /// counter, while OTHER shards keep flowing (per-shard
123    /// isolation).
124    #[serde(default = "default_per_shard_buffer")]
125    pub per_shard_buffer: usize,
126}
127
128fn default_write_period() -> u64 {
129    10
130}
131
132fn default_server_ioc_drift_secs() -> u64 {
133    30 * 60
134}
135
136fn default_write_shards() -> usize {
137    1
138}
139
140fn default_per_shard_buffer() -> usize {
141    4096
142}
143
144impl Default for EngineConfig {
145    // `#[serde(default)]` on the outer `engine` field falls back to
146    // this when the TOML omits `[engine]` entirely. Without a manual
147    // impl, the derived `Default` would zero every numeric field —
148    // notably `write_period_secs = 0`, which then trips
149    // `validate()` ("must be > 0"). Mirror the per-field
150    // `#[serde(default = "...")]` defaults so the no-`[engine]`
151    // and empty-`[engine]` cases behave identically.
152    fn default() -> Self {
153        Self {
154            write_period_secs: default_write_period(),
155            policy_file: None,
156            server_ioc_drift_secs: default_server_ioc_drift_secs(),
157            write_shards: default_write_shards(),
158            per_shard_buffer: default_per_shard_buffer(),
159        }
160    }
161}
162
163/// Security configuration.
164#[derive(Debug, Clone, Serialize, Deserialize)]
165pub struct SecurityConfig {
166    /// CORS allowed origins. Empty = same-origin only (strict).
167    #[serde(default)]
168    pub cors_origins: Vec<String>,
169    /// Rate limit: requests per second per IP (0 = disabled).
170    #[serde(default = "default_rate_limit_rps")]
171    pub rate_limit_rps: u32,
172    /// Rate limit burst size.
173    #[serde(default = "default_rate_limit_burst")]
174    pub rate_limit_burst: u32,
175    /// Maximum request body size in bytes (default 10MB).
176    #[serde(default = "default_max_body_size")]
177    pub max_body_size: usize,
178    /// Trust X-Forwarded-For header for client IP detection (e.g., behind a reverse proxy).
179    /// When false (default), only the direct connection IP is used for rate limiting.
180    /// Enable only when the server is behind a trusted reverse proxy.
181    #[serde(default)]
182    pub trust_proxy_headers: bool,
183}
184
185impl Default for SecurityConfig {
186    fn default() -> Self {
187        Self {
188            cors_origins: Vec::new(),
189            rate_limit_rps: default_rate_limit_rps(),
190            rate_limit_burst: default_rate_limit_burst(),
191            max_body_size: default_max_body_size(),
192            trust_proxy_headers: false,
193        }
194    }
195}
196
197fn default_rate_limit_rps() -> u32 {
198    100
199}
200
201fn default_rate_limit_burst() -> u32 {
202    200
203}
204
205fn default_max_body_size() -> usize {
206    10 * 1024 * 1024 // 10MB
207}
208
209/// TLS configuration for HTTPS support.
210#[derive(Debug, Clone, Serialize, Deserialize)]
211pub struct TlsConfig {
212    pub cert_path: PathBuf,
213    pub key_path: PathBuf,
214}
215
216/// Identity of this appliance in a cluster.
217#[derive(Debug, Clone, Serialize, Deserialize)]
218pub struct ApplianceIdentity {
219    pub name: String,
220    pub mgmt_url: String,
221    pub retrieval_url: String,
222    pub engine_url: String,
223    pub etl_url: String,
224}
225
226/// A remote peer appliance.
227#[derive(Debug, Clone, Serialize, Deserialize)]
228pub struct PeerConfig {
229    pub name: String,
230    pub mgmt_url: String,
231    pub retrieval_url: String,
232    /// Per-peer outbound credential. When this appliance sends proxied requests
233    /// to this peer, it uses this key instead of the cluster-level `api_key`.
234    #[serde(default)]
235    pub api_key: Option<String>,
236}
237
238/// Cluster configuration for multi-appliance mode.
239#[derive(Debug, Clone, Serialize, Deserialize)]
240pub struct ClusterConfig {
241    pub identity: ApplianceIdentity,
242    #[serde(default = "default_cache_ttl")]
243    pub cache_ttl_secs: u64,
244    #[serde(default = "default_peer_timeout")]
245    pub peer_timeout_secs: u64,
246    #[serde(default)]
247    pub peers: Vec<PeerConfig>,
248    /// Shared secret for inter-peer authentication. Used as the outbound credential
249    /// for any peer that does not have its own `api_key` in `[[cluster.peers]]`.
250    /// Also serves as the inbound key this appliance accepts from peers.
251    #[serde(default)]
252    pub api_key: Option<String>,
253    /// Java parity (59f0758): explicit opt-in for the destructive
254    /// `reassignAppliance` live-migration endpoint. Default `false` so
255    /// having cluster mode enabled doesn't on its own permit the
256    /// migration — operators must validate destination data stores
257    /// before flipping this on.
258    #[serde(default)]
259    pub reassign_appliance_enabled: bool,
260}
261
262fn default_cache_ttl() -> u64 {
263    300
264}
265
266fn default_peer_timeout() -> u64 {
267    30
268}
269
270/// Failover retrieval configuration.
271///
272/// `peers` is a list of external archiver URLs serving the same Java-style
273/// retrieval endpoint (`/retrieval/data/getData.raw`). At query time, the
274/// archiver fetches the same `pv` + time range from each peer and merges
275/// the results with the local stream, dropping samples with duplicate
276/// timestamps.
277#[derive(Debug, Clone, Serialize, Deserialize)]
278pub struct FailoverConfig {
279    /// Per-peer retrieval base URLs (e.g. `https://archiver-b.example/retrieval`).
280    /// `getData.raw` is appended automatically.
281    pub peers: Vec<String>,
282    /// HTTP timeout per peer fetch (seconds).
283    #[serde(default = "default_failover_timeout")]
284    pub timeout_secs: u64,
285}
286
287fn default_failover_timeout() -> u64 {
288    30
289}
290
291/// PVA retrieval RPC server configuration.
292#[derive(Debug, Clone, Serialize, Deserialize)]
293pub struct PvaConfig {
294    /// TCP port the PVA server listens on (default 5075).
295    #[serde(default = "default_pva_tcp_port")]
296    pub tcp_port: u16,
297    /// UDP port for PVA search/beacon (default 5076).
298    #[serde(default = "default_pva_udp_port")]
299    pub udp_port: u16,
300}
301
302fn default_pva_tcp_port() -> u16 {
303    5075
304}
305
306fn default_pva_udp_port() -> u16 {
307    5076
308}
309
310impl Default for PvaConfig {
311    fn default() -> Self {
312        Self {
313            tcp_port: default_pva_tcp_port(),
314            udp_port: default_pva_udp_port(),
315        }
316    }
317}
318
319impl ArchiverConfig {
320    pub fn from_toml(s: &str) -> Result<Self, toml::de::Error> {
321        toml::from_str(s)
322    }
323
324    /// Validate configuration values that TOML deserialization alone cannot check.
325    pub fn validate(&self) -> anyhow::Result<()> {
326        if self.engine.write_period_secs == 0 {
327            // The write_loop drives flushes via tokio::time::interval,
328            // which panics on Duration::ZERO. The legacy elapsed-check
329            // tolerated 0 but the ticker path doesn't, so reject it
330            // at config load instead of letting the engine crash on
331            // first start. 1s is the smallest sensible value (sub-
332            // second flush rates serve no archiving purpose).
333            anyhow::bail!("engine.write_period_secs must be > 0");
334        }
335        if self.engine.write_shards == 0 {
336            anyhow::bail!(
337                "engine.write_shards must be > 0 (use 1 for the legacy single-worker layout)"
338            );
339        }
340        if self.engine.per_shard_buffer == 0 && self.engine.write_shards > 1 {
341            anyhow::bail!(
342                "engine.per_shard_buffer must be > 0 when write_shards > 1; \
343                 a 0-capacity shard channel would drop every sample"
344            );
345        }
346        for (name, tier) in [
347            ("sts", &self.storage.sts),
348            ("mts", &self.storage.mts),
349            ("lts", &self.storage.lts),
350        ] {
351            if tier.gather >= tier.hold {
352                anyhow::bail!(
353                    "{name}: gather ({}) must be less than hold ({})",
354                    tier.gather,
355                    tier.hold,
356                );
357            }
358        }
359        if let Some(ref cluster) = self.cluster {
360            if cluster.peer_timeout_secs == 0 {
361                anyhow::bail!("cluster.peer_timeout_secs must be > 0");
362            }
363            if cluster.cache_ttl_secs == 0 {
364                anyhow::bail!("cluster.cache_ttl_secs must be > 0");
365            }
366            // When external API keys are enabled, each peer must have an outbound
367            // credential — either its own `api_key` or the cluster-level fallback.
368            if self.api_keys.is_some() && !cluster.peers.is_empty() {
369                let has_fallback = cluster.api_key.is_some();
370                for (i, peer) in cluster.peers.iter().enumerate() {
371                    if peer.api_key.is_none() && !has_fallback {
372                        anyhow::bail!(
373                            "cluster.peers[{i}] ({}) has no api_key and no cluster.api_key fallback; \
374                             proxied write requests to this peer will be rejected",
375                            peer.name
376                        );
377                    }
378                }
379            }
380            for (i, peer) in cluster.peers.iter().enumerate() {
381                if !peer.mgmt_url.starts_with("http://") && !peer.mgmt_url.starts_with("https://") {
382                    anyhow::bail!(
383                        "cluster.peers[{i}].mgmt_url must start with http:// or https://"
384                    );
385                }
386                if !peer.retrieval_url.starts_with("http://")
387                    && !peer.retrieval_url.starts_with("https://")
388                {
389                    anyhow::bail!(
390                        "cluster.peers[{i}].retrieval_url must start with http:// or https://"
391                    );
392                }
393            }
394        }
395        Ok(())
396    }
397}
398
399#[cfg(test)]
400mod tests {
401    use super::*;
402
403    #[test]
404    fn parse_config_without_cluster() {
405        let toml = r#"
406[storage.sts]
407root_folder = "/tmp/sts"
408partition_granularity = "hour"
409
410[storage.mts]
411root_folder = "/tmp/mts"
412partition_granularity = "day"
413
414[storage.lts]
415root_folder = "/tmp/lts"
416partition_granularity = "year"
417"#;
418        let config = ArchiverConfig::from_toml(toml).unwrap();
419        assert!(config.cluster.is_none());
420    }
421
422    #[test]
423    fn parse_config_with_cluster() {
424        let toml = r#"
425[storage.sts]
426root_folder = "/tmp/sts"
427partition_granularity = "hour"
428
429[storage.mts]
430root_folder = "/tmp/mts"
431partition_granularity = "day"
432
433[storage.lts]
434root_folder = "/tmp/lts"
435partition_granularity = "year"
436
437[cluster.identity]
438name = "appliance0"
439mgmt_url = "http://host0:17665/mgmt/bpl"
440retrieval_url = "http://host0:17665/retrieval"
441engine_url = "http://host0:17665"
442etl_url = "http://host0:17665"
443
444[[cluster.peers]]
445name = "appliance1"
446mgmt_url = "http://host1:17665/mgmt/bpl"
447retrieval_url = "http://host1:17665/retrieval"
448"#;
449        let config = ArchiverConfig::from_toml(toml).unwrap();
450        let cluster = config.cluster.unwrap();
451        assert_eq!(cluster.identity.name, "appliance0");
452        assert_eq!(cluster.peers.len(), 1);
453        assert_eq!(cluster.peers[0].name, "appliance1");
454        assert_eq!(cluster.cache_ttl_secs, 300);
455        assert_eq!(cluster.peer_timeout_secs, 30);
456    }
457
458    #[test]
459    fn validate_cluster_api_key_required_with_api_keys() {
460        let toml = r#"
461api_keys = ["secret"]
462
463[storage.sts]
464root_folder = "/tmp/sts"
465partition_granularity = "hour"
466
467[storage.mts]
468root_folder = "/tmp/mts"
469partition_granularity = "day"
470
471[storage.lts]
472root_folder = "/tmp/lts"
473partition_granularity = "year"
474
475[cluster.identity]
476name = "appliance0"
477mgmt_url = "http://host0:17665/mgmt/bpl"
478retrieval_url = "http://host0:17665/retrieval"
479engine_url = "http://host0:17665"
480etl_url = "http://host0:17665"
481
482[[cluster.peers]]
483name = "appliance1"
484mgmt_url = "http://host1:17665/mgmt/bpl"
485retrieval_url = "http://host1:17665/retrieval"
486"#;
487        let config = ArchiverConfig::from_toml(toml).unwrap();
488        let err = config.validate().unwrap_err();
489        assert!(
490            err.to_string()
491                .contains("has no api_key and no cluster.api_key fallback")
492        );
493    }
494
495    #[test]
496    fn validate_cluster_api_key_not_required_without_api_keys() {
497        let toml = r#"
498[storage.sts]
499root_folder = "/tmp/sts"
500partition_granularity = "hour"
501
502[storage.mts]
503root_folder = "/tmp/mts"
504partition_granularity = "day"
505
506[storage.lts]
507root_folder = "/tmp/lts"
508partition_granularity = "year"
509
510[cluster.identity]
511name = "appliance0"
512mgmt_url = "http://host0:17665/mgmt/bpl"
513retrieval_url = "http://host0:17665/retrieval"
514engine_url = "http://host0:17665"
515etl_url = "http://host0:17665"
516
517[[cluster.peers]]
518name = "appliance1"
519mgmt_url = "http://host1:17665/mgmt/bpl"
520retrieval_url = "http://host1:17665/retrieval"
521"#;
522        let config = ArchiverConfig::from_toml(toml).unwrap();
523        config.validate().unwrap(); // No api_keys → no requirement for cluster.api_key
524    }
525
526    #[test]
527    fn validate_per_peer_keys_without_fallback() {
528        // Each peer has its own api_key → passes even without cluster.api_key.
529        let toml = r#"
530api_keys = ["secret"]
531
532[storage.sts]
533root_folder = "/tmp/sts"
534partition_granularity = "hour"
535
536[storage.mts]
537root_folder = "/tmp/mts"
538partition_granularity = "day"
539
540[storage.lts]
541root_folder = "/tmp/lts"
542partition_granularity = "year"
543
544[cluster.identity]
545name = "appliance0"
546mgmt_url = "http://host0:17665/mgmt/bpl"
547retrieval_url = "http://host0:17665/retrieval"
548engine_url = "http://host0:17665"
549etl_url = "http://host0:17665"
550
551[[cluster.peers]]
552name = "appliance1"
553mgmt_url = "http://host1:17665/mgmt/bpl"
554retrieval_url = "http://host1:17665/retrieval"
555api_key = "peer1-key"
556"#;
557        let config = ArchiverConfig::from_toml(toml).unwrap();
558        config.validate().unwrap();
559    }
560
561    #[test]
562    fn validate_mixed_per_peer_and_fallback() {
563        // One peer has its own key, another relies on the fallback → passes.
564        let toml = r#"
565api_keys = ["secret"]
566
567[storage.sts]
568root_folder = "/tmp/sts"
569partition_granularity = "hour"
570
571[storage.mts]
572root_folder = "/tmp/mts"
573partition_granularity = "day"
574
575[storage.lts]
576root_folder = "/tmp/lts"
577partition_granularity = "year"
578
579[cluster]
580api_key = "shared-fallback"
581
582[cluster.identity]
583name = "appliance0"
584mgmt_url = "http://host0:17665/mgmt/bpl"
585retrieval_url = "http://host0:17665/retrieval"
586engine_url = "http://host0:17665"
587etl_url = "http://host0:17665"
588
589[[cluster.peers]]
590name = "appliance1"
591mgmt_url = "http://host1:17665/mgmt/bpl"
592retrieval_url = "http://host1:17665/retrieval"
593api_key = "peer1-specific"
594
595[[cluster.peers]]
596name = "appliance2"
597mgmt_url = "http://host2:17665/mgmt/bpl"
598retrieval_url = "http://host2:17665/retrieval"
599"#;
600        let config = ArchiverConfig::from_toml(toml).unwrap();
601        config.validate().unwrap();
602    }
603
604    #[test]
605    fn parse_peer_api_key_from_toml() {
606        let toml = r#"
607[storage.sts]
608root_folder = "/tmp/sts"
609partition_granularity = "hour"
610
611[storage.mts]
612root_folder = "/tmp/mts"
613partition_granularity = "day"
614
615[storage.lts]
616root_folder = "/tmp/lts"
617partition_granularity = "year"
618
619[cluster.identity]
620name = "appliance0"
621mgmt_url = "http://host0:17665/mgmt/bpl"
622retrieval_url = "http://host0:17665/retrieval"
623engine_url = "http://host0:17665"
624etl_url = "http://host0:17665"
625
626[[cluster.peers]]
627name = "appliance1"
628mgmt_url = "http://host1:17665/mgmt/bpl"
629retrieval_url = "http://host1:17665/retrieval"
630api_key = "peer1-secret"
631
632[[cluster.peers]]
633name = "appliance2"
634mgmt_url = "http://host2:17665/mgmt/bpl"
635retrieval_url = "http://host2:17665/retrieval"
636"#;
637        let config = ArchiverConfig::from_toml(toml).unwrap();
638        let cluster = config.cluster.unwrap();
639        assert_eq!(cluster.peers[0].api_key.as_deref(), Some("peer1-secret"));
640        assert_eq!(cluster.peers[1].api_key, None);
641    }
642}