Skip to main content

s2_lite/
init.rs

1//! Declarative basin/stream initialization from a JSON spec file.
2//!
3//! Loaded at startup when `--init-file` / `S2LITE_INIT_FILE` is set.
4
5use std::{borrow::Cow, path::Path, time::Duration};
6
7use s2_common::{
8    maybe::Maybe,
9    types::{
10        basin::BasinName,
11        config::{
12            BasinReconfiguration, DeleteOnEmptyReconfiguration, RetentionPolicy, StorageClass,
13            StreamReconfiguration, TimestampingMode, TimestampingReconfiguration,
14        },
15        resources::CreateMode,
16        stream::StreamName,
17    },
18};
19use serde::{Deserialize, Serialize};
20use tracing::info;
21
22use crate::backend::Backend;
23
24#[derive(Debug, Deserialize, Default, schemars::JsonSchema)]
25pub struct ResourcesSpec {
26    #[serde(default)]
27    pub basins: Vec<BasinSpec>,
28}
29
30#[derive(Debug, Deserialize, schemars::JsonSchema)]
31#[serde(deny_unknown_fields)]
32pub struct BasinSpec {
33    pub name: String,
34    #[serde(default)]
35    pub config: Option<BasinConfigSpec>,
36    #[serde(default)]
37    pub streams: Vec<StreamSpec>,
38}
39
40#[derive(Debug, Deserialize, schemars::JsonSchema)]
41#[serde(deny_unknown_fields)]
42pub struct StreamSpec {
43    pub name: String,
44    #[serde(default)]
45    pub config: Option<StreamConfigSpec>,
46}
47
48#[derive(Debug, Clone, Deserialize, Default, schemars::JsonSchema)]
49#[serde(deny_unknown_fields)]
50pub struct BasinConfigSpec {
51    #[serde(default)]
52    pub default_stream_config: Option<StreamConfigSpec>,
53    /// Encryption algorithm to apply to newly created streams in the basin.
54    #[serde(default)]
55    pub stream_cipher: Option<EncryptionAlgorithmSpec>,
56    /// Create stream on append if it doesn't exist, using the default stream configuration.
57    #[serde(default)]
58    pub create_stream_on_append: Option<bool>,
59    /// Create stream on read if it doesn't exist, using the default stream configuration.
60    #[serde(default)]
61    pub create_stream_on_read: Option<bool>,
62}
63
64#[derive(Debug, Clone, Deserialize, Default, schemars::JsonSchema)]
65#[serde(deny_unknown_fields)]
66pub struct StreamConfigSpec {
67    /// Storage class for recent writes.
68    #[serde(default)]
69    pub storage_class: Option<StorageClassSpec>,
70    /// Retention policy for the stream. If unspecified, the default is to retain records for 7
71    /// days.
72    #[serde(default)]
73    pub retention_policy: Option<RetentionPolicySpec>,
74    /// Timestamping behavior.
75    #[serde(default)]
76    pub timestamping: Option<TimestampingSpec>,
77    /// Delete-on-empty configuration.
78    #[serde(default)]
79    pub delete_on_empty: Option<DeleteOnEmptySpec>,
80}
81
82#[derive(Debug, Clone, Deserialize, Serialize)]
83#[serde(rename_all = "kebab-case")]
84pub enum StorageClassSpec {
85    Standard,
86    Express,
87}
88
89impl schemars::JsonSchema for StorageClassSpec {
90    fn schema_name() -> Cow<'static, str> {
91        "StorageClassSpec".into()
92    }
93
94    fn json_schema(_: &mut schemars::SchemaGenerator) -> schemars::Schema {
95        schemars::json_schema!({
96            "type": "string",
97            "description": "Storage class for recent writes.",
98            "enum": ["standard", "express"]
99        })
100    }
101}
102
103impl From<StorageClassSpec> for StorageClass {
104    fn from(s: StorageClassSpec) -> Self {
105        match s {
106            StorageClassSpec::Standard => StorageClass::Standard,
107            StorageClassSpec::Express => StorageClass::Express,
108        }
109    }
110}
111
112#[derive(Debug, Clone, Deserialize, Serialize)]
113pub enum EncryptionAlgorithmSpec {
114    #[serde(rename = "aegis-256")]
115    Aegis256,
116    #[serde(rename = "aes-256-gcm")]
117    Aes256Gcm,
118}
119
120impl schemars::JsonSchema for EncryptionAlgorithmSpec {
121    fn schema_name() -> Cow<'static, str> {
122        "EncryptionAlgorithmSpec".into()
123    }
124
125    fn json_schema(_: &mut schemars::SchemaGenerator) -> schemars::Schema {
126        schemars::json_schema!({
127            "type": "string",
128            "description": "Encryption algorithm to apply to newly created streams in the basin.",
129            "enum": ["aegis-256", "aes-256-gcm"]
130        })
131    }
132}
133
134impl From<EncryptionAlgorithmSpec> for s2_common::encryption::EncryptionAlgorithm {
135    fn from(m: EncryptionAlgorithmSpec) -> Self {
136        match m {
137            EncryptionAlgorithmSpec::Aegis256 => Self::Aegis256,
138            EncryptionAlgorithmSpec::Aes256Gcm => Self::Aes256Gcm,
139        }
140    }
141}
142
143/// Accepts `"infinite"` or a humantime duration string such as `"7d"`, `"1w"`.
144#[derive(Debug, Clone, Copy)]
145pub struct RetentionPolicySpec(pub RetentionPolicy);
146
147impl RetentionPolicySpec {
148    pub fn age_secs(self) -> Option<u64> {
149        self.0.age().map(|d| d.as_secs())
150    }
151}
152
153impl TryFrom<String> for RetentionPolicySpec {
154    type Error = String;
155
156    fn try_from(s: String) -> Result<Self, Self::Error> {
157        if s.eq_ignore_ascii_case("infinite") {
158            return Ok(RetentionPolicySpec(RetentionPolicy::Infinite()));
159        }
160        let d = humantime::parse_duration(&s)
161            .map_err(|e| format!("invalid retention_policy {:?}: {}", s, e))?;
162        Ok(RetentionPolicySpec(RetentionPolicy::Age(d)))
163    }
164}
165
166impl<'de> Deserialize<'de> for RetentionPolicySpec {
167    fn deserialize<D: serde::Deserializer<'de>>(d: D) -> Result<Self, D::Error> {
168        let s = String::deserialize(d)?;
169        RetentionPolicySpec::try_from(s).map_err(serde::de::Error::custom)
170    }
171}
172
173impl schemars::JsonSchema for RetentionPolicySpec {
174    fn schema_name() -> Cow<'static, str> {
175        "RetentionPolicySpec".into()
176    }
177
178    fn json_schema(_: &mut schemars::SchemaGenerator) -> schemars::Schema {
179        schemars::json_schema!({
180            "type": "string",
181            "description": "Retain records unless explicitly trimmed (\"infinite\"), or automatically \
182                trim records older than the given duration (e.g. \"7days\", \"1week\").",
183            "examples": ["infinite", "7days", "1week"]
184        })
185    }
186}
187
188#[derive(Debug, Clone, Deserialize, schemars::JsonSchema)]
189#[serde(deny_unknown_fields)]
190pub struct TimestampingSpec {
191    /// Timestamping mode for appends that influences how timestamps are handled.
192    #[serde(default)]
193    pub mode: Option<TimestampingModeSpec>,
194    /// Allow client-specified timestamps to exceed the arrival time.
195    /// If this is `false` or not set, client timestamps will be capped at the arrival time.
196    #[serde(default)]
197    pub uncapped: Option<bool>,
198}
199
200#[derive(Debug, Clone, Deserialize, Serialize)]
201#[serde(rename_all = "kebab-case")]
202pub enum TimestampingModeSpec {
203    ClientPrefer,
204    ClientRequire,
205    Arrival,
206}
207
208impl schemars::JsonSchema for TimestampingModeSpec {
209    fn schema_name() -> Cow<'static, str> {
210        "TimestampingModeSpec".into()
211    }
212
213    fn json_schema(_: &mut schemars::SchemaGenerator) -> schemars::Schema {
214        schemars::json_schema!({
215            "type": "string",
216            "description": "Timestamping mode for appends that influences how timestamps are handled.",
217            "enum": ["client-prefer", "client-require", "arrival"]
218        })
219    }
220}
221
222impl From<TimestampingModeSpec> for TimestampingMode {
223    fn from(m: TimestampingModeSpec) -> Self {
224        match m {
225            TimestampingModeSpec::ClientPrefer => TimestampingMode::ClientPrefer,
226            TimestampingModeSpec::ClientRequire => TimestampingMode::ClientRequire,
227            TimestampingModeSpec::Arrival => TimestampingMode::Arrival,
228        }
229    }
230}
231
232#[derive(Debug, Clone, Deserialize, schemars::JsonSchema)]
233#[serde(deny_unknown_fields)]
234pub struct DeleteOnEmptySpec {
235    /// Minimum age before an empty stream can be deleted.
236    /// Set to 0 (default) to disable delete-on-empty (don't delete automatically).
237    #[serde(default)]
238    pub min_age: Option<HumanDuration>,
239}
240
241/// A `std::time::Duration` deserialized from a humantime string (e.g. `"1d"`, `"2h 30m"`).
242#[derive(Debug, Clone, Copy)]
243pub struct HumanDuration(pub Duration);
244
245impl TryFrom<String> for HumanDuration {
246    type Error = String;
247
248    fn try_from(s: String) -> Result<Self, Self::Error> {
249        humantime::parse_duration(&s)
250            .map(HumanDuration)
251            .map_err(|e| format!("invalid duration {:?}: {}", s, e))
252    }
253}
254
255impl<'de> Deserialize<'de> for HumanDuration {
256    fn deserialize<D: serde::Deserializer<'de>>(d: D) -> Result<Self, D::Error> {
257        let s = String::deserialize(d)?;
258        HumanDuration::try_from(s).map_err(serde::de::Error::custom)
259    }
260}
261
262impl schemars::JsonSchema for HumanDuration {
263    fn schema_name() -> Cow<'static, str> {
264        "HumanDuration".into()
265    }
266
267    fn json_schema(_: &mut schemars::SchemaGenerator) -> schemars::Schema {
268        schemars::json_schema!({
269            "type": "string",
270            "description": "A duration string in humantime format, e.g. \"1day\", \"2h 30m\"",
271            "examples": ["1day", "2h 30m"]
272        })
273    }
274}
275
276impl From<BasinConfigSpec> for BasinReconfiguration {
277    fn from(s: BasinConfigSpec) -> Self {
278        BasinReconfiguration {
279            default_stream_config: s
280                .default_stream_config
281                .map(|dsc| Some(StreamReconfiguration::from(dsc)))
282                .map_or(Maybe::Unspecified, Maybe::Specified),
283            stream_cipher: s
284                .stream_cipher
285                .map(|algorithm| Some(algorithm.into()))
286                .map_or(Maybe::Unspecified, Maybe::Specified),
287            create_stream_on_append: s
288                .create_stream_on_append
289                .map_or(Maybe::Unspecified, Maybe::Specified),
290            create_stream_on_read: s
291                .create_stream_on_read
292                .map_or(Maybe::Unspecified, Maybe::Specified),
293        }
294    }
295}
296
297impl From<StreamConfigSpec> for StreamReconfiguration {
298    fn from(s: StreamConfigSpec) -> Self {
299        StreamReconfiguration {
300            storage_class: s
301                .storage_class
302                .map(|sc| Some(StorageClass::from(sc)))
303                .map_or(Maybe::Unspecified, Maybe::Specified),
304            retention_policy: s
305                .retention_policy
306                .map(|rp| Some(rp.0))
307                .map_or(Maybe::Unspecified, Maybe::Specified),
308            timestamping: s
309                .timestamping
310                .map(|ts| {
311                    Some(TimestampingReconfiguration {
312                        mode: ts
313                            .mode
314                            .map(|m| Some(TimestampingMode::from(m)))
315                            .map_or(Maybe::Unspecified, Maybe::Specified),
316                        uncapped: ts
317                            .uncapped
318                            .map(Some)
319                            .map_or(Maybe::Unspecified, Maybe::Specified),
320                    })
321                })
322                .map_or(Maybe::Unspecified, Maybe::Specified),
323            delete_on_empty: s
324                .delete_on_empty
325                .map(|doe| {
326                    Some(DeleteOnEmptyReconfiguration {
327                        min_age: doe
328                            .min_age
329                            .map(|h| Some(h.0))
330                            .map_or(Maybe::Unspecified, Maybe::Specified),
331                    })
332                })
333                .map_or(Maybe::Unspecified, Maybe::Specified),
334        }
335    }
336}
337
338pub fn json_schema() -> serde_json::Value {
339    serde_json::to_value(schemars::schema_for!(ResourcesSpec)).unwrap()
340}
341
342pub fn validate(spec: &ResourcesSpec) -> eyre::Result<()> {
343    let mut errors = Vec::new();
344    let mut seen_basins = std::collections::HashSet::new();
345
346    for basin_spec in &spec.basins {
347        if !seen_basins.insert(basin_spec.name.clone()) {
348            errors.push(format!("duplicate basin name {:?}", basin_spec.name));
349        }
350
351        if let Err(e) = basin_spec.name.parse::<BasinName>() {
352            errors.push(format!("invalid basin name {:?}: {}", basin_spec.name, e));
353            continue;
354        }
355
356        let mut seen_streams = std::collections::HashSet::new();
357        for stream_spec in &basin_spec.streams {
358            if !seen_streams.insert(stream_spec.name.clone()) {
359                errors.push(format!(
360                    "duplicate stream name {:?} in basin {:?}",
361                    stream_spec.name, basin_spec.name
362                ));
363            }
364            if let Err(e) = stream_spec.name.parse::<StreamName>() {
365                errors.push(format!(
366                    "invalid stream name {:?} in basin {:?}: {}",
367                    stream_spec.name, basin_spec.name, e
368                ));
369            }
370        }
371    }
372
373    if errors.is_empty() {
374        Ok(())
375    } else {
376        Err(eyre::eyre!("{}", errors.join("\n")))
377    }
378}
379
380pub fn load(path: &Path) -> eyre::Result<ResourcesSpec> {
381    let contents = std::fs::read_to_string(path)
382        .map_err(|e| eyre::eyre!("failed to read init file {:?}: {}", path, e))?;
383    let spec: ResourcesSpec = serde_json::from_str(&contents)
384        .map_err(|e| eyre::eyre!("failed to parse init file {:?}: {}", path, e))?;
385    Ok(spec)
386}
387
388pub async fn apply(backend: &Backend, spec: ResourcesSpec) -> eyre::Result<()> {
389    validate(&spec)?;
390
391    for basin_spec in spec.basins {
392        let basin: BasinName = basin_spec
393            .name
394            .parse()
395            .map_err(|e| eyre::eyre!("invalid basin name {:?}: {}", basin_spec.name, e))?;
396
397        let reconfiguration = basin_spec
398            .config
399            .map(BasinReconfiguration::from)
400            .unwrap_or_default();
401
402        backend
403            .create_basin(
404                basin.clone(),
405                reconfiguration,
406                CreateMode::CreateOrReconfigure,
407            )
408            .await
409            .map_err(|e| eyre::eyre!("failed to apply basin {:?}: {}", basin.as_ref(), e))?;
410
411        info!(basin = basin.as_ref(), "basin applied");
412
413        for stream_spec in basin_spec.streams {
414            let stream: StreamName = stream_spec
415                .name
416                .parse()
417                .map_err(|e| eyre::eyre!("invalid stream name {:?}: {}", stream_spec.name, e))?;
418
419            let reconfiguration = stream_spec
420                .config
421                .map(StreamReconfiguration::from)
422                .unwrap_or_default();
423
424            backend
425                .create_stream(
426                    basin.clone(),
427                    stream.clone(),
428                    reconfiguration,
429                    CreateMode::CreateOrReconfigure,
430                )
431                .await
432                .map_err(|e| {
433                    eyre::eyre!(
434                        "failed to apply stream {:?}/{:?}: {}",
435                        basin.as_ref(),
436                        stream.as_ref(),
437                        e
438                    )
439                })?;
440
441            info!(
442                basin = basin.as_ref(),
443                stream = stream.as_ref(),
444                "stream applied"
445            );
446        }
447    }
448    Ok(())
449}
450
451#[cfg(test)]
452mod tests {
453    use super::*;
454
455    fn parse_spec(json: &str) -> ResourcesSpec {
456        serde_json::from_str(json).expect("valid JSON")
457    }
458
459    #[test]
460    fn empty_spec() {
461        let spec = parse_spec("{}");
462        assert!(spec.basins.is_empty());
463    }
464
465    #[test]
466    fn basin_no_config() {
467        let spec = parse_spec(r#"{"basins":[{"name":"my-basin"}]}"#);
468        assert_eq!(spec.basins.len(), 1);
469        assert_eq!(spec.basins[0].name, "my-basin");
470        assert!(spec.basins[0].config.is_none());
471        assert!(spec.basins[0].streams.is_empty());
472    }
473
474    #[test]
475    fn retention_policy_infinite() {
476        let rp: RetentionPolicySpec = serde_json::from_str(r#""infinite""#).expect("deserialize");
477        assert!(matches!(rp.0, RetentionPolicy::Infinite()));
478    }
479
480    #[test]
481    fn retention_policy_duration() {
482        let rp: RetentionPolicySpec = serde_json::from_str(r#""7days""#).expect("deserialize");
483        assert!(matches!(rp.0, RetentionPolicy::Age(_)));
484        if let RetentionPolicy::Age(d) = rp.0 {
485            assert_eq!(d, Duration::from_secs(7 * 24 * 3600));
486        }
487    }
488
489    #[test]
490    fn retention_policy_invalid() {
491        let err = serde_json::from_str::<RetentionPolicySpec>(r#""not-a-duration""#);
492        assert!(err.is_err());
493    }
494
495    #[test]
496    fn human_duration() {
497        let hd: HumanDuration = serde_json::from_str(r#""1day""#).expect("deserialize");
498        assert_eq!(hd.0, Duration::from_secs(86400));
499    }
500
501    #[test]
502    fn full_spec_roundtrip() {
503        let json = r#"
504        {
505          "basins": [
506            {
507              "name": "my-basin",
508              "config": {
509                "create_stream_on_append": true,
510                "create_stream_on_read": false,
511                "default_stream_config": {
512                  "storage_class": "express",
513                  "retention_policy": "7days",
514                  "timestamping": {
515                    "mode": "client-prefer",
516                    "uncapped": false
517                  },
518                  "delete_on_empty": {
519                    "min_age": "1day"
520                  }
521                }
522              },
523              "streams": [
524                {
525                  "name": "events",
526                  "config": {
527                    "storage_class": "standard",
528                    "retention_policy": "infinite"
529                  }
530                }
531              ]
532            }
533          ]
534        }"#;
535
536        let spec = parse_spec(json);
537        assert_eq!(spec.basins.len(), 1);
538        let basin = &spec.basins[0];
539        assert_eq!(basin.name, "my-basin");
540
541        let config = basin.config.as_ref().unwrap();
542        assert_eq!(config.create_stream_on_append, Some(true));
543        assert_eq!(config.create_stream_on_read, Some(false));
544
545        let dsc = config.default_stream_config.as_ref().unwrap();
546        assert!(matches!(dsc.storage_class, Some(StorageClassSpec::Express)));
547        assert!(matches!(
548            dsc.retention_policy.as_ref().map(|r| &r.0),
549            Some(RetentionPolicy::Age(_))
550        ));
551
552        let ts = dsc.timestamping.as_ref().unwrap();
553        assert!(matches!(ts.mode, Some(TimestampingModeSpec::ClientPrefer)));
554        assert_eq!(ts.uncapped, Some(false));
555
556        let doe = dsc.delete_on_empty.as_ref().unwrap();
557        assert_eq!(
558            doe.min_age.as_ref().map(|h| h.0),
559            Some(Duration::from_secs(86400))
560        );
561
562        assert_eq!(basin.streams.len(), 1);
563        let stream = &basin.streams[0];
564        assert_eq!(stream.name, "events");
565        let sc = stream.config.as_ref().unwrap();
566        assert!(matches!(sc.storage_class, Some(StorageClassSpec::Standard)));
567        assert!(matches!(
568            sc.retention_policy.as_ref().map(|r| &r.0),
569            Some(RetentionPolicy::Infinite())
570        ));
571    }
572
573    #[test]
574    fn basin_config_conversion() {
575        let spec = BasinConfigSpec {
576            default_stream_config: None,
577            stream_cipher: None,
578            create_stream_on_append: Some(true),
579            create_stream_on_read: None,
580        };
581        let reconfig = BasinReconfiguration::from(spec);
582        assert!(matches!(
583            reconfig.create_stream_on_append,
584            Maybe::Specified(true)
585        ));
586        assert!(matches!(reconfig.create_stream_on_read, Maybe::Unspecified));
587        assert!(matches!(reconfig.default_stream_config, Maybe::Unspecified));
588    }
589
590    #[test]
591    fn validate_valid_spec() {
592        let spec = parse_spec(
593            r#"{"basins":[{"name":"my-basin","streams":[{"name":"events"},{"name":"logs"}]}]}"#,
594        );
595        assert!(validate(&spec).is_ok());
596    }
597
598    #[test]
599    fn validate_invalid_basin_name() {
600        let spec = parse_spec(r#"{"basins":[{"name":"INVALID_BASIN"}]}"#);
601        let err = validate(&spec).unwrap_err();
602        assert!(err.to_string().contains("invalid basin name"));
603    }
604
605    #[test]
606    fn validate_invalid_stream_name() {
607        let spec = parse_spec(r#"{"basins":[{"name":"my-basin","streams":[{"name":""}]}]}"#);
608        let err = validate(&spec).unwrap_err();
609        assert!(err.to_string().contains("invalid stream name"));
610    }
611
612    #[test]
613    fn validate_duplicate_basin_names() {
614        let spec = parse_spec(r#"{"basins":[{"name":"my-basin"},{"name":"my-basin"}]}"#);
615        let err = validate(&spec).unwrap_err();
616        assert!(err.to_string().contains("duplicate basin name"));
617    }
618
619    #[test]
620    fn validate_duplicate_stream_names() {
621        let spec = parse_spec(
622            r#"{"basins":[{"name":"my-basin","streams":[{"name":"events"},{"name":"events"}]}]}"#,
623        );
624        let err = validate(&spec).unwrap_err();
625        assert!(err.to_string().contains("duplicate stream name"));
626    }
627
628    #[test]
629    fn validate_multiple_errors() {
630        let spec = parse_spec(r#"{"basins":[{"name":"INVALID"},{"name":"INVALID"}]}"#);
631        let err = validate(&spec).unwrap_err();
632        let msg = err.to_string();
633        assert!(msg.contains("invalid basin name"));
634        assert!(msg.contains("duplicate basin name"));
635    }
636
637    #[test]
638    fn json_schema_is_valid() {
639        let schema = json_schema();
640        assert!(schema.is_object());
641        let schema_obj = schema.as_object().unwrap();
642
643        // using the default generated
644        assert_eq!(
645            schema_obj.get("$schema"),
646            Some(&serde_json::Value::String(
647                "https://json-schema.org/draft/2020-12/schema".to_string()
648            ))
649        );
650
651        assert!(
652            schema_obj.contains_key("properties"),
653            "schema should have root properties"
654        );
655
656        assert!(
657            schema_obj.contains_key("$defs"),
658            "schema should have $defs for reusable definitions"
659        );
660
661        let properties = schema_obj.get("properties").unwrap().as_object().unwrap();
662        assert!(
663            properties.contains_key("basins"),
664            "schema should include the `basins` property"
665        );
666    }
667
668    #[test]
669    fn stream_config_conversion() {
670        let spec = StreamConfigSpec {
671            storage_class: Some(StorageClassSpec::Standard),
672            retention_policy: Some(RetentionPolicySpec(RetentionPolicy::Infinite())),
673            timestamping: None,
674            delete_on_empty: None,
675        };
676        let reconfig = StreamReconfiguration::from(spec);
677        assert!(matches!(
678            reconfig.storage_class,
679            Maybe::Specified(Some(StorageClass::Standard))
680        ));
681        assert!(matches!(
682            reconfig.retention_policy,
683            Maybe::Specified(Some(RetentionPolicy::Infinite()))
684        ));
685        assert!(matches!(reconfig.timestamping, Maybe::Unspecified));
686        assert!(matches!(reconfig.delete_on_empty, Maybe::Unspecified));
687    }
688}