Skip to main content

s2_lite/
init.rs

1//! Declarative basin/stream initialization from a JSON spec file.
2//!
3//! Loaded at startup when `--init-file` / `S2LITE_INIT_FILE` is set.
4
5use std::{borrow::Cow, path::Path, time::Duration};
6
7use s2_common::types::{
8    basin::BasinName,
9    config::{
10        BasinConfig, OptionalDeleteOnEmptyConfig, OptionalStreamConfig, OptionalTimestampingConfig,
11        RetentionPolicy, StorageClass, TimestampingMode,
12    },
13    resources::ProvisionMode,
14    stream::StreamName,
15};
16use serde::{Deserialize, Serialize};
17use tracing::info;
18
19use crate::backend::Backend;
20
21#[derive(Debug, Deserialize, Default, schemars::JsonSchema)]
22pub struct ResourcesSpec {
23    #[serde(default)]
24    pub basins: Vec<BasinSpec>,
25}
26
27#[derive(Debug, Deserialize, schemars::JsonSchema)]
28#[serde(deny_unknown_fields)]
29pub struct BasinSpec {
30    pub name: String,
31    #[serde(default)]
32    pub config: Option<BasinConfigSpec>,
33    #[serde(default)]
34    pub streams: Vec<StreamSpec>,
35}
36
37#[derive(Debug, Deserialize, schemars::JsonSchema)]
38#[serde(deny_unknown_fields)]
39pub struct StreamSpec {
40    pub name: String,
41    #[serde(default)]
42    pub config: Option<StreamConfigSpec>,
43}
44
45#[derive(Debug, Clone, Deserialize, Default, schemars::JsonSchema)]
46#[serde(deny_unknown_fields)]
47pub struct BasinConfigSpec {
48    #[serde(default)]
49    pub default_stream_config: Option<StreamConfigSpec>,
50    /// Encryption algorithm to apply to newly created streams in the basin.
51    #[serde(default)]
52    pub stream_cipher: Option<EncryptionAlgorithmSpec>,
53    /// Create stream on append if it doesn't exist, using the default stream configuration.
54    #[serde(default)]
55    pub create_stream_on_append: Option<bool>,
56    /// Create stream on read if it doesn't exist, using the default stream configuration.
57    #[serde(default)]
58    pub create_stream_on_read: Option<bool>,
59}
60
61#[derive(Debug, Clone, Deserialize, Default, schemars::JsonSchema)]
62#[serde(deny_unknown_fields)]
63pub struct StreamConfigSpec {
64    /// Storage class for recent writes.
65    #[serde(default)]
66    pub storage_class: Option<StorageClassSpec>,
67    /// Retention policy for the stream. If unspecified, the default is to retain records for 7
68    /// days.
69    #[serde(default)]
70    pub retention_policy: Option<RetentionPolicySpec>,
71    /// Timestamping behavior.
72    #[serde(default)]
73    pub timestamping: Option<TimestampingSpec>,
74    /// Delete-on-empty configuration.
75    #[serde(default)]
76    pub delete_on_empty: Option<DeleteOnEmptySpec>,
77}
78
79#[derive(Debug, Clone, Deserialize, Serialize)]
80#[serde(rename_all = "kebab-case")]
81pub enum StorageClassSpec {
82    Standard,
83    Express,
84}
85
86impl schemars::JsonSchema for StorageClassSpec {
87    fn schema_name() -> Cow<'static, str> {
88        "StorageClassSpec".into()
89    }
90
91    fn json_schema(_: &mut schemars::SchemaGenerator) -> schemars::Schema {
92        schemars::json_schema!({
93            "type": "string",
94            "description": "Storage class for recent writes.",
95            "enum": ["standard", "express"]
96        })
97    }
98}
99
100impl From<StorageClassSpec> for StorageClass {
101    fn from(s: StorageClassSpec) -> Self {
102        match s {
103            StorageClassSpec::Standard => StorageClass::Standard,
104            StorageClassSpec::Express => StorageClass::Express,
105        }
106    }
107}
108
109#[derive(Debug, Clone, Deserialize, Serialize)]
110pub enum EncryptionAlgorithmSpec {
111    #[serde(rename = "aegis-256")]
112    Aegis256,
113    #[serde(rename = "aes-256-gcm")]
114    Aes256Gcm,
115}
116
117impl schemars::JsonSchema for EncryptionAlgorithmSpec {
118    fn schema_name() -> Cow<'static, str> {
119        "EncryptionAlgorithmSpec".into()
120    }
121
122    fn json_schema(_: &mut schemars::SchemaGenerator) -> schemars::Schema {
123        schemars::json_schema!({
124            "type": "string",
125            "description": "Encryption algorithm to apply to newly created streams in the basin.",
126            "enum": ["aegis-256", "aes-256-gcm"]
127        })
128    }
129}
130
131impl From<EncryptionAlgorithmSpec> for s2_common::encryption::EncryptionAlgorithm {
132    fn from(m: EncryptionAlgorithmSpec) -> Self {
133        match m {
134            EncryptionAlgorithmSpec::Aegis256 => Self::Aegis256,
135            EncryptionAlgorithmSpec::Aes256Gcm => Self::Aes256Gcm,
136        }
137    }
138}
139
140/// Accepts `"infinite"` or a humantime duration string such as `"7d"`, `"1w"`.
141#[derive(Debug, Clone, Copy)]
142pub struct RetentionPolicySpec(pub RetentionPolicy);
143
144impl RetentionPolicySpec {
145    pub fn age_secs(self) -> Option<u64> {
146        self.0.age().map(|d| d.as_secs())
147    }
148}
149
150impl TryFrom<String> for RetentionPolicySpec {
151    type Error = String;
152
153    fn try_from(s: String) -> Result<Self, Self::Error> {
154        if s.eq_ignore_ascii_case("infinite") {
155            return Ok(RetentionPolicySpec(RetentionPolicy::Infinite()));
156        }
157        let d = humantime::parse_duration(&s)
158            .map_err(|e| format!("invalid retention_policy {:?}: {}", s, e))?;
159        Ok(RetentionPolicySpec(RetentionPolicy::Age(d)))
160    }
161}
162
163impl<'de> Deserialize<'de> for RetentionPolicySpec {
164    fn deserialize<D: serde::Deserializer<'de>>(d: D) -> Result<Self, D::Error> {
165        let s = String::deserialize(d)?;
166        RetentionPolicySpec::try_from(s).map_err(serde::de::Error::custom)
167    }
168}
169
170impl schemars::JsonSchema for RetentionPolicySpec {
171    fn schema_name() -> Cow<'static, str> {
172        "RetentionPolicySpec".into()
173    }
174
175    fn json_schema(_: &mut schemars::SchemaGenerator) -> schemars::Schema {
176        schemars::json_schema!({
177            "type": "string",
178            "description": "Retain records unless explicitly trimmed (\"infinite\"), or automatically \
179                trim records older than the given duration (e.g. \"7days\", \"1week\").",
180            "examples": ["infinite", "7days", "1week"]
181        })
182    }
183}
184
185#[derive(Debug, Clone, Deserialize, schemars::JsonSchema)]
186#[serde(deny_unknown_fields)]
187pub struct TimestampingSpec {
188    /// Timestamping mode for appends that influences how timestamps are handled.
189    #[serde(default)]
190    pub mode: Option<TimestampingModeSpec>,
191    /// Allow client-specified timestamps to exceed the arrival time.
192    /// If this is `false` or not set, client timestamps will be capped at the arrival time.
193    #[serde(default)]
194    pub uncapped: Option<bool>,
195}
196
197#[derive(Debug, Clone, Deserialize, Serialize)]
198#[serde(rename_all = "kebab-case")]
199pub enum TimestampingModeSpec {
200    ClientPrefer,
201    ClientRequire,
202    Arrival,
203}
204
205impl schemars::JsonSchema for TimestampingModeSpec {
206    fn schema_name() -> Cow<'static, str> {
207        "TimestampingModeSpec".into()
208    }
209
210    fn json_schema(_: &mut schemars::SchemaGenerator) -> schemars::Schema {
211        schemars::json_schema!({
212            "type": "string",
213            "description": "Timestamping mode for appends that influences how timestamps are handled.",
214            "enum": ["client-prefer", "client-require", "arrival"]
215        })
216    }
217}
218
219impl From<TimestampingModeSpec> for TimestampingMode {
220    fn from(m: TimestampingModeSpec) -> Self {
221        match m {
222            TimestampingModeSpec::ClientPrefer => TimestampingMode::ClientPrefer,
223            TimestampingModeSpec::ClientRequire => TimestampingMode::ClientRequire,
224            TimestampingModeSpec::Arrival => TimestampingMode::Arrival,
225        }
226    }
227}
228
229#[derive(Debug, Clone, Deserialize, schemars::JsonSchema)]
230#[serde(deny_unknown_fields)]
231pub struct DeleteOnEmptySpec {
232    /// Minimum age before an empty stream can be deleted.
233    /// Set to 0 (default) to disable delete-on-empty (don't delete automatically).
234    #[serde(default)]
235    pub min_age: Option<HumanDuration>,
236}
237
238/// A `std::time::Duration` deserialized from a humantime string (e.g. `"1d"`, `"2h 30m"`).
239#[derive(Debug, Clone, Copy)]
240pub struct HumanDuration(pub Duration);
241
242impl TryFrom<String> for HumanDuration {
243    type Error = String;
244
245    fn try_from(s: String) -> Result<Self, Self::Error> {
246        humantime::parse_duration(&s)
247            .map(HumanDuration)
248            .map_err(|e| format!("invalid duration {:?}: {}", s, e))
249    }
250}
251
252impl<'de> Deserialize<'de> for HumanDuration {
253    fn deserialize<D: serde::Deserializer<'de>>(d: D) -> Result<Self, D::Error> {
254        let s = String::deserialize(d)?;
255        HumanDuration::try_from(s).map_err(serde::de::Error::custom)
256    }
257}
258
259impl schemars::JsonSchema for HumanDuration {
260    fn schema_name() -> Cow<'static, str> {
261        "HumanDuration".into()
262    }
263
264    fn json_schema(_: &mut schemars::SchemaGenerator) -> schemars::Schema {
265        schemars::json_schema!({
266            "type": "string",
267            "description": "A duration string in humantime format, e.g. \"1day\", \"2h 30m\"",
268            "examples": ["1day", "2h 30m"]
269        })
270    }
271}
272
273impl From<BasinConfigSpec> for BasinConfig {
274    fn from(s: BasinConfigSpec) -> Self {
275        BasinConfig {
276            default_stream_config: s.default_stream_config.map(Into::into).unwrap_or_default(),
277            stream_cipher: s.stream_cipher.map(Into::into),
278            create_stream_on_append: s.create_stream_on_append.unwrap_or_default(),
279            create_stream_on_read: s.create_stream_on_read.unwrap_or_default(),
280        }
281    }
282}
283
284impl From<TimestampingSpec> for OptionalTimestampingConfig {
285    fn from(s: TimestampingSpec) -> Self {
286        OptionalTimestampingConfig {
287            mode: s.mode.map(Into::into),
288            uncapped: s.uncapped,
289        }
290    }
291}
292
293impl From<DeleteOnEmptySpec> for OptionalDeleteOnEmptyConfig {
294    fn from(s: DeleteOnEmptySpec) -> Self {
295        OptionalDeleteOnEmptyConfig {
296            min_age: s.min_age.map(|h| h.0),
297        }
298    }
299}
300
301impl From<StreamConfigSpec> for OptionalStreamConfig {
302    fn from(s: StreamConfigSpec) -> Self {
303        OptionalStreamConfig {
304            storage_class: s.storage_class.map(Into::into),
305            retention_policy: s.retention_policy.map(|rp| rp.0),
306            timestamping: s.timestamping.map(Into::into).unwrap_or_default(),
307            delete_on_empty: s.delete_on_empty.map(Into::into).unwrap_or_default(),
308        }
309    }
310}
311
312pub fn json_schema() -> serde_json::Value {
313    serde_json::to_value(schemars::schema_for!(ResourcesSpec)).unwrap()
314}
315
316pub fn validate(spec: &ResourcesSpec) -> eyre::Result<()> {
317    let mut errors = Vec::new();
318    let mut seen_basins = std::collections::HashSet::new();
319
320    for basin_spec in &spec.basins {
321        if !seen_basins.insert(basin_spec.name.clone()) {
322            errors.push(format!("duplicate basin name {:?}", basin_spec.name));
323        }
324
325        if let Err(e) = basin_spec.name.parse::<BasinName>() {
326            errors.push(format!("invalid basin name {:?}: {}", basin_spec.name, e));
327            continue;
328        }
329
330        let mut seen_streams = std::collections::HashSet::new();
331        for stream_spec in &basin_spec.streams {
332            if !seen_streams.insert(stream_spec.name.clone()) {
333                errors.push(format!(
334                    "duplicate stream name {:?} in basin {:?}",
335                    stream_spec.name, basin_spec.name
336                ));
337            }
338            if let Err(e) = stream_spec.name.parse::<StreamName>() {
339                errors.push(format!(
340                    "invalid stream name {:?} in basin {:?}: {}",
341                    stream_spec.name, basin_spec.name, e
342                ));
343            }
344        }
345    }
346
347    if errors.is_empty() {
348        Ok(())
349    } else {
350        Err(eyre::eyre!("{}", errors.join("\n")))
351    }
352}
353
354pub fn load(path: &Path) -> eyre::Result<ResourcesSpec> {
355    let contents = std::fs::read_to_string(path)
356        .map_err(|e| eyre::eyre!("failed to read init file {:?}: {}", path, e))?;
357    let spec: ResourcesSpec = serde_json::from_str(&contents)
358        .map_err(|e| eyre::eyre!("failed to parse init file {:?}: {}", path, e))?;
359    Ok(spec)
360}
361
362pub async fn apply(backend: &Backend, spec: ResourcesSpec) -> eyre::Result<()> {
363    validate(&spec)?;
364
365    for basin_spec in spec.basins {
366        let basin: BasinName = basin_spec
367            .name
368            .parse()
369            .map_err(|e| eyre::eyre!("invalid basin name {:?}: {}", basin_spec.name, e))?;
370
371        let config = basin_spec.config.map(BasinConfig::from).unwrap_or_default();
372
373        backend
374            .provision_basin(basin.clone(), config, ProvisionMode::Ensure)
375            .await
376            .map_err(|e| eyre::eyre!("failed to apply basin {:?}: {}", basin.as_ref(), e))?;
377
378        info!(basin = basin.as_ref(), "basin applied");
379
380        for stream_spec in basin_spec.streams {
381            let stream: StreamName = stream_spec
382                .name
383                .parse()
384                .map_err(|e| eyre::eyre!("invalid stream name {:?}: {}", stream_spec.name, e))?;
385
386            let config = stream_spec
387                .config
388                .map(OptionalStreamConfig::from)
389                .unwrap_or_default();
390
391            backend
392                .provision_stream(basin.clone(), stream.clone(), config, ProvisionMode::Ensure)
393                .await
394                .map_err(|e| {
395                    eyre::eyre!(
396                        "failed to apply stream {:?}/{:?}: {}",
397                        basin.as_ref(),
398                        stream.as_ref(),
399                        e
400                    )
401                })?;
402
403            info!(
404                basin = basin.as_ref(),
405                stream = stream.as_ref(),
406                "stream applied"
407            );
408        }
409    }
410    Ok(())
411}
412
413#[cfg(test)]
414mod tests {
415    use super::*;
416
417    fn parse_spec(json: &str) -> ResourcesSpec {
418        serde_json::from_str(json).expect("valid JSON")
419    }
420
421    #[test]
422    fn empty_spec() {
423        let spec = parse_spec("{}");
424        assert!(spec.basins.is_empty());
425    }
426
427    #[test]
428    fn basin_no_config() {
429        let spec = parse_spec(r#"{"basins":[{"name":"my-basin"}]}"#);
430        assert_eq!(spec.basins.len(), 1);
431        assert_eq!(spec.basins[0].name, "my-basin");
432        assert!(spec.basins[0].config.is_none());
433        assert!(spec.basins[0].streams.is_empty());
434    }
435
436    #[test]
437    fn retention_policy_infinite() {
438        let rp: RetentionPolicySpec = serde_json::from_str(r#""infinite""#).expect("deserialize");
439        assert!(matches!(rp.0, RetentionPolicy::Infinite()));
440    }
441
442    #[test]
443    fn retention_policy_duration() {
444        let rp: RetentionPolicySpec = serde_json::from_str(r#""7days""#).expect("deserialize");
445        assert!(matches!(rp.0, RetentionPolicy::Age(_)));
446        if let RetentionPolicy::Age(d) = rp.0 {
447            assert_eq!(d, Duration::from_secs(7 * 24 * 3600));
448        }
449    }
450
451    #[test]
452    fn retention_policy_invalid() {
453        let err = serde_json::from_str::<RetentionPolicySpec>(r#""not-a-duration""#);
454        assert!(err.is_err());
455    }
456
457    #[test]
458    fn human_duration() {
459        let hd: HumanDuration = serde_json::from_str(r#""1day""#).expect("deserialize");
460        assert_eq!(hd.0, Duration::from_secs(86400));
461    }
462
463    #[test]
464    fn full_spec_roundtrip() {
465        let json = r#"
466        {
467          "basins": [
468            {
469              "name": "my-basin",
470              "config": {
471                "create_stream_on_append": true,
472                "create_stream_on_read": false,
473                "default_stream_config": {
474                  "storage_class": "express",
475                  "retention_policy": "7days",
476                  "timestamping": {
477                    "mode": "client-prefer",
478                    "uncapped": false
479                  },
480                  "delete_on_empty": {
481                    "min_age": "1day"
482                  }
483                }
484              },
485              "streams": [
486                {
487                  "name": "events",
488                  "config": {
489                    "storage_class": "standard",
490                    "retention_policy": "infinite"
491                  }
492                }
493              ]
494            }
495          ]
496        }"#;
497
498        let spec = parse_spec(json);
499        assert_eq!(spec.basins.len(), 1);
500        let basin = &spec.basins[0];
501        assert_eq!(basin.name, "my-basin");
502
503        let config = basin.config.as_ref().unwrap();
504        assert_eq!(config.create_stream_on_append, Some(true));
505        assert_eq!(config.create_stream_on_read, Some(false));
506
507        let dsc = config.default_stream_config.as_ref().unwrap();
508        assert!(matches!(dsc.storage_class, Some(StorageClassSpec::Express)));
509        assert!(matches!(
510            dsc.retention_policy.as_ref().map(|r| &r.0),
511            Some(RetentionPolicy::Age(_))
512        ));
513
514        let ts = dsc.timestamping.as_ref().unwrap();
515        assert!(matches!(ts.mode, Some(TimestampingModeSpec::ClientPrefer)));
516        assert_eq!(ts.uncapped, Some(false));
517
518        let doe = dsc.delete_on_empty.as_ref().unwrap();
519        assert_eq!(
520            doe.min_age.as_ref().map(|h| h.0),
521            Some(Duration::from_secs(86400))
522        );
523
524        assert_eq!(basin.streams.len(), 1);
525        let stream = &basin.streams[0];
526        assert_eq!(stream.name, "events");
527        let sc = stream.config.as_ref().unwrap();
528        assert!(matches!(sc.storage_class, Some(StorageClassSpec::Standard)));
529        assert!(matches!(
530            sc.retention_policy.as_ref().map(|r| &r.0),
531            Some(RetentionPolicy::Infinite())
532        ));
533    }
534
535    #[test]
536    fn basin_config_conversion() {
537        let spec = BasinConfigSpec {
538            default_stream_config: None,
539            stream_cipher: None,
540            create_stream_on_append: Some(true),
541            create_stream_on_read: None,
542        };
543        let config = BasinConfig::from(spec);
544        assert!(config.create_stream_on_append);
545        assert!(!config.create_stream_on_read);
546        assert_eq!(
547            config.default_stream_config,
548            OptionalStreamConfig::default()
549        );
550    }
551
552    #[test]
553    fn validate_valid_spec() {
554        let spec = parse_spec(
555            r#"{"basins":[{"name":"my-basin","streams":[{"name":"events"},{"name":"logs"}]}]}"#,
556        );
557        assert!(validate(&spec).is_ok());
558    }
559
560    #[test]
561    fn validate_invalid_basin_name() {
562        let spec = parse_spec(r#"{"basins":[{"name":"INVALID_BASIN"}]}"#);
563        let err = validate(&spec).unwrap_err();
564        assert!(err.to_string().contains("invalid basin name"));
565    }
566
567    #[test]
568    fn validate_invalid_stream_name() {
569        let spec = parse_spec(r#"{"basins":[{"name":"my-basin","streams":[{"name":""}]}]}"#);
570        let err = validate(&spec).unwrap_err();
571        assert!(err.to_string().contains("invalid stream name"));
572    }
573
574    #[test]
575    fn validate_duplicate_basin_names() {
576        let spec = parse_spec(r#"{"basins":[{"name":"my-basin"},{"name":"my-basin"}]}"#);
577        let err = validate(&spec).unwrap_err();
578        assert!(err.to_string().contains("duplicate basin name"));
579    }
580
581    #[test]
582    fn validate_duplicate_stream_names() {
583        let spec = parse_spec(
584            r#"{"basins":[{"name":"my-basin","streams":[{"name":"events"},{"name":"events"}]}]}"#,
585        );
586        let err = validate(&spec).unwrap_err();
587        assert!(err.to_string().contains("duplicate stream name"));
588    }
589
590    #[test]
591    fn validate_multiple_errors() {
592        let spec = parse_spec(r#"{"basins":[{"name":"INVALID"},{"name":"INVALID"}]}"#);
593        let err = validate(&spec).unwrap_err();
594        let msg = err.to_string();
595        assert!(msg.contains("invalid basin name"));
596        assert!(msg.contains("duplicate basin name"));
597    }
598
599    #[test]
600    fn json_schema_is_valid() {
601        let schema = json_schema();
602        assert!(schema.is_object());
603        let schema_obj = schema.as_object().unwrap();
604
605        // using the default generated
606        assert_eq!(
607            schema_obj.get("$schema"),
608            Some(&serde_json::Value::String(
609                "https://json-schema.org/draft/2020-12/schema".to_string()
610            ))
611        );
612
613        assert!(
614            schema_obj.contains_key("properties"),
615            "schema should have root properties"
616        );
617
618        assert!(
619            schema_obj.contains_key("$defs"),
620            "schema should have $defs for reusable definitions"
621        );
622
623        let properties = schema_obj.get("properties").unwrap().as_object().unwrap();
624        assert!(
625            properties.contains_key("basins"),
626            "schema should include the `basins` property"
627        );
628    }
629
630    #[test]
631    fn stream_config_conversion() {
632        let spec = StreamConfigSpec {
633            storage_class: Some(StorageClassSpec::Standard),
634            retention_policy: Some(RetentionPolicySpec(RetentionPolicy::Infinite())),
635            timestamping: None,
636            delete_on_empty: None,
637        };
638        let config = OptionalStreamConfig::from(spec);
639        assert_eq!(config.storage_class, Some(StorageClass::Standard));
640        assert_eq!(config.retention_policy, Some(RetentionPolicy::Infinite()));
641        assert_eq!(config.timestamping, OptionalTimestampingConfig::default());
642        assert_eq!(
643            config.delete_on_empty,
644            OptionalDeleteOnEmptyConfig::default()
645        );
646    }
647}