Skip to main content

netidx_archive/
config.rs

1use self::file::RecordShardConfig;
2use anyhow::Result;
3use arcstr::ArcStr;
4use derive_builder::Builder;
5use netidx::{
6    config::Config as NetIdxCfg,
7    path::Path,
8    protocol::glob::Glob,
9    publisher::BindCfg,
10    resolver_client::{DesiredAuth, GlobSet},
11};
12use serde_derive::{Deserialize, Serialize};
13use std::{collections::HashMap, path::{PathBuf, Path as FilePath}, time::Duration};
14
15pub mod file {
16    use arcstr::literal;
17
18    use super::*;
19
20    pub fn default_max_sessions() -> usize {
21        512
22    }
23
24    pub fn default_max_sessions_per_client() -> usize {
25        64
26    }
27
28    pub fn default_oneshot_data_limit() -> usize {
29        104857600
30    }
31
32    pub fn default_cluster() -> String {
33        "cluster".into()
34    }
35
36    #[derive(Debug, Clone, Serialize, Deserialize)]
37    #[serde(deny_unknown_fields)]
38    pub struct PublishConfig {
39        pub base: Path,
40        #[serde(default)]
41        pub bind: Option<String>,
42        #[serde(default = "default_max_sessions")]
43        pub max_sessions: usize,
44        #[serde(default = "default_max_sessions_per_client")]
45        pub max_sessions_per_client: usize,
46        #[serde(default = "default_oneshot_data_limit")]
47        pub oneshot_data_limit: usize,
48        #[serde(default)]
49        pub cluster_shards: Option<usize>,
50        #[serde(default = "default_cluster")]
51        pub cluster: String,
52    }
53
54    impl Default for PublishConfig {
55        fn default() -> Self {
56            Self {
57                base: Path::from("/archive"),
58                bind: None,
59                max_sessions: default_max_sessions(),
60                max_sessions_per_client: default_max_sessions_per_client(),
61                oneshot_data_limit: default_oneshot_data_limit(),
62                cluster_shards: Some(0),
63                cluster: default_cluster(),
64            }
65        }
66    }
67
68    impl PublishConfig {
69        pub fn example() -> Self {
70            Self::default()
71        }
72    }
73
74    pub fn default_poll_interval() -> Option<Duration> {
75        Some(Duration::from_secs(5))
76    }
77
78    pub fn default_image_frequency() -> Option<usize> {
79        Some(67108864)
80    }
81
82    pub fn default_flush_frequency() -> Option<usize> {
83        Some(65534)
84    }
85
86    pub fn default_flush_interval() -> Option<Duration> {
87        Some(Duration::from_secs(30))
88    }
89
90    pub fn default_rotate_interval() -> RotateDirective {
91        RotateDirective::Interval(Duration::from_secs(86400))
92    }
93
94    pub fn default_slack() -> usize {
95        100
96    }
97
98    #[derive(Debug, Clone, Serialize, Deserialize)]
99    #[serde(deny_unknown_fields)]
100    pub struct RecordShardConfig {
101        /// The globs defining the path space this shard will
102        /// record. This MUST be disjoint from the path space recorded
103        /// by any other shard.
104        ///
105        /// If spec is empty, then no recorder task will be started
106        /// for the shard, however the shard's `ArchiveCollectionWriter`
107        /// will be available so you can log to the shard directly.
108        pub spec: Vec<ArcStr>,
109        /// override the poll_interval for this shard
110        pub poll_interval: Option<Duration>,
111        /// override the image_frequency for this shard
112        pub image_frequency: Option<usize>,
113        /// override the flush_frequency for this shard
114        pub flush_frequency: Option<usize>,
115        /// override the flush_interval for this shard
116        pub flush_interval: Option<Duration>,
117        /// override the rotate_interval for this shard
118        pub rotate_interval: Option<RotateDirective>,
119        /// how much channel slack between subscriber and the recorder
120        /// task should this shard have. Higher numbers use more
121        /// memory but will reduce pushback on the publisher when the
122        /// disk is busy.
123        #[serde(default = "default_slack")]
124        pub slack: usize,
125    }
126
127    impl Default for RecordShardConfig {
128        fn default() -> Self {
129            Self {
130                spec: vec![literal!("/tmp/**")],
131                poll_interval: None,
132                image_frequency: None,
133                flush_frequency: None,
134                flush_interval: None,
135                rotate_interval: None,
136                slack: default_slack(),
137            }
138        }
139    }
140
141    impl RecordShardConfig {
142        pub fn example() -> Self {
143            Self::default()
144        }
145    }
146
147    #[derive(Debug, Clone, Serialize, Deserialize)]
148    #[serde(deny_unknown_fields)]
149    pub struct RecordConfig {
150        #[serde(default = "default_poll_interval")]
151        pub poll_interval: Option<Duration>,
152        #[serde(default = "default_image_frequency")]
153        pub image_frequency: Option<usize>,
154        #[serde(default = "default_flush_frequency")]
155        pub flush_frequency: Option<usize>,
156        #[serde(default = "default_flush_interval")]
157        pub flush_interval: Option<Duration>,
158        #[serde(default = "default_rotate_interval")]
159        pub rotate_interval: RotateDirective,
160        pub shards: HashMap<ArcStr, RecordShardConfig>,
161    }
162
163    impl Default for RecordConfig {
164        fn default() -> Self {
165            Self {
166                poll_interval: default_poll_interval(),
167                image_frequency: default_image_frequency(),
168                flush_frequency: default_flush_frequency(),
169                flush_interval: default_flush_interval(),
170                rotate_interval: default_rotate_interval(),
171                shards: HashMap::from([("0".into(), RecordShardConfig::example())]),
172            }
173        }
174    }
175
176    impl RecordConfig {
177        pub fn example() -> Self {
178            Self::default()
179        }
180    }
181
182    #[derive(Debug, Clone, Serialize, Deserialize)]
183    #[serde(deny_unknown_fields)]
184    pub struct Config {
185        pub archive_directory: PathBuf,
186        #[serde(default)]
187        pub archive_cmds: Option<ArchiveCmds>,
188        #[serde(default)]
189        pub netidx_config: Option<PathBuf>,
190        #[serde(default)]
191        pub desired_auth: Option<DesiredAuth>,
192        #[serde(default)]
193        pub record: Option<RecordConfig>,
194        #[serde(default)]
195        pub publish: Option<PublishConfig>,
196    }
197
198    impl Default for Config {
199        fn default() -> Self {
200            Self {
201                archive_directory: PathBuf::from("/foo/bar"),
202                archive_cmds: Some(ArchiveCmds {
203                    list: (
204                        "cmd_to_list_dates_in_archive".into(),
205                        vec!["-s".into(), "{shard}".into()],
206                    ),
207                    get: (
208                        "cmd_to_fetch_file_from_archive".into(),
209                        vec!["-s".into(), "{shard}".into()],
210                    ),
211                    put: (
212                        "cmd_to_put_file_into_archive".into(),
213                        vec!["-s".into(), "{shard}".into()],
214                    ),
215                }),
216                netidx_config: None,
217                desired_auth: None,
218                record: Some(RecordConfig::example()),
219                publish: Some(PublishConfig::example()),
220            }
221        }
222    }
223
224    impl Config {
225        pub fn example() -> String {
226            serde_json::to_string_pretty(&Self::default()).unwrap()
227        }
228    }
229}
230
231#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
232#[serde(deny_unknown_fields)]
233pub enum RotateDirective {
234    Interval(Duration),
235    Size(usize),
236    Never,
237}
238
239/// Configuration of the publish part of the recorder
240#[derive(Debug, Clone, Builder)]
241pub struct PublishConfig {
242    /// The base path to publish under
243    #[builder(setter(into))]
244    pub(crate) base: Path,
245    /// The publisher bind config.
246    pub(crate) bind: BindCfg,
247    /// The maximum number of client sessions
248    #[builder(default = "file::default_max_sessions()")]
249    pub(crate) max_sessions: usize,
250    /// The maximum number of sessions per unique client
251    #[builder(default = "file::default_max_sessions_per_client()")]
252    pub(crate) max_sessions_per_client: usize,
253    /// The maximum number of bytes a oneshot will return
254    #[builder(default = "file::default_oneshot_data_limit()")]
255    pub(crate) oneshot_data_limit: usize,
256    /// How many external shards there are. e.g. instances on other
257    /// machines. This is used to sync up the cluster.
258    #[builder(default = "None")]
259    pub(crate) cluster_shards: Option<usize>,
260    /// The cluster name to join, default is "cluster".
261    #[builder(default = "file::default_cluster()")]
262    pub(crate) cluster: String,
263}
264
265impl PublishConfigBuilder {
266    /// Set the bind config to the default bind config from the netidx cfg
267    pub fn bind_from_cfg(&mut self, cfg: &NetIdxCfg) -> &mut Self {
268        self.bind = Some(cfg.default_bind_config.clone());
269        self
270    }
271}
272
273impl PublishConfig {
274    fn from_file(netidx_cfg: &NetIdxCfg, f: file::PublishConfig) -> Result<Self> {
275        Ok(Self {
276            base: f.base,
277            bind: f
278                .bind
279                .map(|s| s.parse::<BindCfg>())
280                .unwrap_or_else(|| Ok(netidx_cfg.default_bind_config.clone()))?,
281            max_sessions: f.max_sessions,
282            max_sessions_per_client: f.max_sessions_per_client,
283            oneshot_data_limit: f.oneshot_data_limit,
284            cluster_shards: f.cluster_shards,
285            cluster: f.cluster,
286        })
287    }
288}
289
290/// Configuration of the record part of the recorder
291#[derive(Debug, Clone, Builder)]
292pub struct RecordConfig {
293    /// The path spec globs to record. If you set this to an empty
294    /// `GlobSet` then no record task will be started for this shard.
295    /// You can then take the `LogfileCollectionWriter` from `Shards`
296    /// and write to it manually.
297    #[builder(try_setter, setter, default = "Self::default_spec()")]
298    pub(crate) spec: GlobSet,
299    /// how often to poll the resolver for structure changes. None
300    /// means only once at startup.
301    #[builder(default = "file::default_poll_interval()")]
302    pub(crate) poll_interval: Option<Duration>,
303    /// how often to write a full image. None means never write
304    /// images. Ignored if spec is empty.
305    #[builder(default = "file::default_image_frequency()")]
306    pub(crate) image_frequency: Option<usize>,
307    /// flush the file after the specified number of pages have been
308    /// written. None means never flush. Ignored if spec is empty.
309    #[builder(default = "file::default_flush_frequency()")]
310    pub(crate) flush_frequency: Option<usize>,
311    /// flush the file after the specified elapsed time. None means
312    /// flush only on shutdown. Ignored if spec is empty.
313    #[builder(default = "file::default_flush_interval()")]
314    pub(crate) flush_interval: Option<Duration>,
315    /// rotate the log file at the specified interval or file size or
316    /// never. Ignored if spec is empty.
317    #[builder(default = "file::default_rotate_interval()")]
318    pub(crate) rotate_interval: RotateDirective,
319    /// how much channel slack to allocate. Ignored if spec is empty.
320    #[builder(default = "file::default_slack()")]
321    pub(crate) slack: usize,
322}
323
324impl RecordConfigBuilder {
325    fn default_spec() -> GlobSet {
326        GlobSet::new(true, [Glob::new("/**".into()).unwrap()]).unwrap()
327    }
328}
329
330impl RecordConfig {
331    fn from_file(f: file::RecordConfig) -> Result<HashMap<ArcStr, RecordConfig>> {
332        let mut shards = HashMap::default();
333        for (name, c) in f.shards {
334            let RecordShardConfig {
335                spec,
336                poll_interval,
337                image_frequency,
338                flush_frequency,
339                flush_interval,
340                rotate_interval,
341                slack,
342            } = c;
343            let res = RecordConfig {
344                spec: GlobSet::new(
345                    true,
346                    spec.into_iter().map(Glob::new).collect::<Result<Vec<_>>>()?,
347                )?,
348                poll_interval: poll_interval.or(f.poll_interval),
349                image_frequency: image_frequency.or(f.image_frequency),
350                flush_frequency: flush_frequency.or(f.flush_frequency),
351                flush_interval: flush_interval.or(f.flush_interval),
352                rotate_interval: rotate_interval.unwrap_or(f.rotate_interval),
353                slack,
354            };
355            shards.insert(name, res);
356        }
357        Ok(shards)
358    }
359}
360
361#[derive(Debug, Clone, Serialize, Deserialize)]
362pub struct ArchiveCmds {
363    pub list: (String, Vec<String>),
364    pub get: (String, Vec<String>),
365    pub put: (String, Vec<String>),
366}
367
368/// Configuration of the recorder
369#[derive(Debug, Clone, Builder)]
370#[builder(build_fn(validate = "Self::validate"))]
371pub struct Config {
372    /// The directory where the archive files live. The current
373    /// archive will be called 'current', the path mappings will be
374    /// called 'pathmap', and previous rotated archive files will be
375    /// named the rfc3339 timestamp that specifies when they were
376    /// rotated (and thus when they ended).
377    #[builder(setter(into))]
378    pub(crate) archive_directory: PathBuf,
379    #[builder(setter(strip_option), default)]
380    pub(crate) archive_cmds: Option<ArchiveCmds>,
381    /// The netidx config to use
382    #[builder(setter(strip_option), default)]
383    pub(crate) netidx_config: Option<NetIdxCfg>,
384    /// The netidx desired authentication mechanism to use
385    #[builder(default = "self.default_desired_auth()")]
386    pub(crate) desired_auth: DesiredAuth,
387    /// Record. Each entry in the HashMap is a shard, which will
388    /// record independently to an archive directory under the base
389    /// directory. E.G. a shard named "0" will record under
390    /// ${archive_base}/0. If publish is specified all configured
391    /// shards on this instance will be published.
392    #[builder(setter(into), default)]
393    pub(crate) record: HashMap<ArcStr, RecordConfig>,
394    /// If specified this recorder will publish the archive
395    /// directory. It is possible for the same archiver to both record
396    /// and publish. One of record or publish must be specifed.
397    #[builder(setter(strip_option), default)]
398    pub(crate) publish: Option<PublishConfig>,
399}
400
401impl ConfigBuilder {
402    fn validate(&self) -> std::result::Result<(), String> {
403        let record_empty = self.record.as_ref().map(|t| t.is_empty()).unwrap_or(true);
404        let publish_empty = self.publish.is_none();
405        if record_empty && publish_empty {
406            return Err("config must specify at least record, publish, or both".into());
407        }
408        Ok(())
409    }
410
411    fn default_desired_auth(&self) -> DesiredAuth {
412        self.netidx_config
413            .as_ref()
414            .and_then(|n| n.as_ref())
415            .map(|c| c.default_auth())
416            .unwrap_or(DesiredAuth::Local)
417    }
418}
419
420impl TryFrom<file::Config> for Config {
421    type Error = anyhow::Error;
422
423    fn try_from(f: file::Config) -> Result<Self> {
424        let netidx_config = f
425            .netidx_config
426            .map(NetIdxCfg::load)
427            .unwrap_or_else(|| NetIdxCfg::load_default())?;
428        let desired_auth = f.desired_auth.unwrap_or_else(|| netidx_config.default_auth());
429        let publish =
430            f.publish.map(|f| PublishConfig::from_file(&netidx_config, f)).transpose()?;
431        Ok(Self {
432            archive_directory: f.archive_directory,
433            archive_cmds: f.archive_cmds,
434            netidx_config: Some(netidx_config),
435            desired_auth,
436            record: f
437                .record
438                .map(|r| RecordConfig::from_file(r))
439                .transpose()?
440                .unwrap_or(HashMap::default()),
441            publish,
442        })
443    }
444}
445
446impl Config {
447    /// Load the config from the specified file
448    pub async fn load<F: AsRef<std::path::Path>>(file: F) -> Result<Config> {
449        let s = tokio::fs::read(file).await?;
450        let f = serde_json::from_slice::<file::Config>(&s)?;
451        Config::try_from(f)
452    }
453
454    pub fn example() -> String {
455        file::Config::example()
456    }
457
458    pub fn archive_directory(&self) -> &FilePath {
459        &self.archive_directory
460    }
461}