Skip to main content

s2_lite/
init.rs

1//! Declarative basin/stream initialization from a JSON spec file.
2//!
3//! Loaded at startup when `--init-file` / `S2LITE_INIT_FILE` is set.
4
5use std::path::Path;
6
7use s2_common::{
8    resource_spec::{self, ResourcesSpec},
9    types::{
10        basin::BasinName,
11        config::{BasinConfig, OptionalStreamConfig},
12        resources::ProvisionMode,
13        stream::StreamName,
14    },
15};
16use tracing::info;
17
18use crate::backend::Backend;
19
20pub fn load(path: &Path) -> eyre::Result<ResourcesSpec> {
21    let contents = std::fs::read_to_string(path)
22        .map_err(|e| eyre::eyre!("failed to read init file {:?}: {}", path, e))?;
23    let spec: ResourcesSpec = serde_json::from_str(&contents)
24        .map_err(|e| eyre::eyre!("failed to parse init file {:?}: {}", path, e))?;
25    Ok(spec)
26}
27
28pub async fn apply(backend: &Backend, spec: ResourcesSpec) -> eyre::Result<()> {
29    resource_spec::validate(&spec).map_err(|e| eyre::eyre!(e))?;
30
31    for basin_spec in spec.basins {
32        let basin: BasinName = basin_spec
33            .name
34            .parse()
35            .map_err(|e| eyre::eyre!("invalid basin name {:?}: {}", basin_spec.name, e))?;
36
37        let config = basin_spec.config.map(BasinConfig::from).unwrap_or_default();
38
39        backend
40            .provision_basin(basin.clone(), config, ProvisionMode::Ensure)
41            .await
42            .map_err(|e| eyre::eyre!("failed to apply basin {:?}: {}", basin.as_ref(), e))?;
43
44        info!(basin = basin.as_ref(), "basin applied");
45
46        for stream_spec in basin_spec.streams {
47            let stream: StreamName = stream_spec
48                .name
49                .parse()
50                .map_err(|e| eyre::eyre!("invalid stream name {:?}: {}", stream_spec.name, e))?;
51
52            let config = stream_spec
53                .config
54                .map(OptionalStreamConfig::from)
55                .unwrap_or_default();
56
57            backend
58                .provision_stream(basin.clone(), stream.clone(), config, ProvisionMode::Ensure)
59                .await
60                .map_err(|e| {
61                    eyre::eyre!(
62                        "failed to apply stream {:?}/{:?}: {}",
63                        basin.as_ref(),
64                        stream.as_ref(),
65                        e
66                    )
67                })?;
68
69            info!(
70                basin = basin.as_ref(),
71                stream = stream.as_ref(),
72                "stream applied"
73            );
74        }
75    }
76    Ok(())
77}