Skip to main content

s2_lite/
init.rs

1//! Declarative basin/stream initialization from a JSON spec file.
2//!
3//! Loaded at startup when `--init-file` / `S2LITE_INIT_FILE` is set.
4
5use std::{borrow::Cow, path::Path, time::Duration};
6
7use s2_common::{
8    maybe::Maybe,
9    types::{
10        basin::{BasinName, CreateBasinIntent},
11        config::{
12            BasinReconfiguration, DeleteOnEmptyReconfiguration, RetentionPolicy, StorageClass,
13            StreamReconfiguration, TimestampingMode, TimestampingReconfiguration,
14        },
15        stream::{CreateStreamIntent, StreamName},
16    },
17};
18use serde::{Deserialize, Serialize};
19use tracing::info;
20
21use crate::backend::Backend;
22
23#[derive(Debug, Deserialize, Default, schemars::JsonSchema)]
24pub struct ResourcesSpec {
25    #[serde(default)]
26    pub basins: Vec<BasinSpec>,
27}
28
29#[derive(Debug, Deserialize, schemars::JsonSchema)]
30#[serde(deny_unknown_fields)]
31pub struct BasinSpec {
32    pub name: String,
33    #[serde(default)]
34    pub config: Option<BasinConfigSpec>,
35    #[serde(default)]
36    pub streams: Vec<StreamSpec>,
37}
38
39#[derive(Debug, Deserialize, schemars::JsonSchema)]
40#[serde(deny_unknown_fields)]
41pub struct StreamSpec {
42    pub name: String,
43    #[serde(default)]
44    pub config: Option<StreamConfigSpec>,
45}
46
47#[derive(Debug, Clone, Deserialize, Default, schemars::JsonSchema)]
48#[serde(deny_unknown_fields)]
49pub struct BasinConfigSpec {
50    #[serde(default)]
51    pub default_stream_config: Option<StreamConfigSpec>,
52    /// Encryption algorithm to apply to newly created streams in the basin.
53    #[serde(default)]
54    pub stream_cipher: Option<EncryptionAlgorithmSpec>,
55    /// Create stream on append if it doesn't exist, using the default stream configuration.
56    #[serde(default)]
57    pub create_stream_on_append: Option<bool>,
58    /// Create stream on read if it doesn't exist, using the default stream configuration.
59    #[serde(default)]
60    pub create_stream_on_read: Option<bool>,
61}
62
63#[derive(Debug, Clone, Deserialize, Default, schemars::JsonSchema)]
64#[serde(deny_unknown_fields)]
65pub struct StreamConfigSpec {
66    /// Storage class for recent writes.
67    #[serde(default)]
68    pub storage_class: Option<StorageClassSpec>,
69    /// Retention policy for the stream. If unspecified, the default is to retain records for 7
70    /// days.
71    #[serde(default)]
72    pub retention_policy: Option<RetentionPolicySpec>,
73    /// Timestamping behavior.
74    #[serde(default)]
75    pub timestamping: Option<TimestampingSpec>,
76    /// Delete-on-empty configuration.
77    #[serde(default)]
78    pub delete_on_empty: Option<DeleteOnEmptySpec>,
79}
80
81#[derive(Debug, Clone, Deserialize, Serialize)]
82#[serde(rename_all = "kebab-case")]
83pub enum StorageClassSpec {
84    Standard,
85    Express,
86}
87
88impl schemars::JsonSchema for StorageClassSpec {
89    fn schema_name() -> Cow<'static, str> {
90        "StorageClassSpec".into()
91    }
92
93    fn json_schema(_: &mut schemars::SchemaGenerator) -> schemars::Schema {
94        schemars::json_schema!({
95            "type": "string",
96            "description": "Storage class for recent writes.",
97            "enum": ["standard", "express"]
98        })
99    }
100}
101
102impl From<StorageClassSpec> for StorageClass {
103    fn from(s: StorageClassSpec) -> Self {
104        match s {
105            StorageClassSpec::Standard => StorageClass::Standard,
106            StorageClassSpec::Express => StorageClass::Express,
107        }
108    }
109}
110
111#[derive(Debug, Clone, Deserialize, Serialize)]
112pub enum EncryptionAlgorithmSpec {
113    #[serde(rename = "aegis-256")]
114    Aegis256,
115    #[serde(rename = "aes-256-gcm")]
116    Aes256Gcm,
117}
118
119impl schemars::JsonSchema for EncryptionAlgorithmSpec {
120    fn schema_name() -> Cow<'static, str> {
121        "EncryptionAlgorithmSpec".into()
122    }
123
124    fn json_schema(_: &mut schemars::SchemaGenerator) -> schemars::Schema {
125        schemars::json_schema!({
126            "type": "string",
127            "description": "Encryption algorithm to apply to newly created streams in the basin.",
128            "enum": ["aegis-256", "aes-256-gcm"]
129        })
130    }
131}
132
133impl From<EncryptionAlgorithmSpec> for s2_common::encryption::EncryptionAlgorithm {
134    fn from(m: EncryptionAlgorithmSpec) -> Self {
135        match m {
136            EncryptionAlgorithmSpec::Aegis256 => Self::Aegis256,
137            EncryptionAlgorithmSpec::Aes256Gcm => Self::Aes256Gcm,
138        }
139    }
140}
141
142/// Accepts `"infinite"` or a humantime duration string such as `"7d"`, `"1w"`.
143#[derive(Debug, Clone, Copy)]
144pub struct RetentionPolicySpec(pub RetentionPolicy);
145
146impl RetentionPolicySpec {
147    pub fn age_secs(self) -> Option<u64> {
148        self.0.age().map(|d| d.as_secs())
149    }
150}
151
152impl TryFrom<String> for RetentionPolicySpec {
153    type Error = String;
154
155    fn try_from(s: String) -> Result<Self, Self::Error> {
156        if s.eq_ignore_ascii_case("infinite") {
157            return Ok(RetentionPolicySpec(RetentionPolicy::Infinite()));
158        }
159        let d = humantime::parse_duration(&s)
160            .map_err(|e| format!("invalid retention_policy {:?}: {}", s, e))?;
161        Ok(RetentionPolicySpec(RetentionPolicy::Age(d)))
162    }
163}
164
165impl<'de> Deserialize<'de> for RetentionPolicySpec {
166    fn deserialize<D: serde::Deserializer<'de>>(d: D) -> Result<Self, D::Error> {
167        let s = String::deserialize(d)?;
168        RetentionPolicySpec::try_from(s).map_err(serde::de::Error::custom)
169    }
170}
171
172impl schemars::JsonSchema for RetentionPolicySpec {
173    fn schema_name() -> Cow<'static, str> {
174        "RetentionPolicySpec".into()
175    }
176
177    fn json_schema(_: &mut schemars::SchemaGenerator) -> schemars::Schema {
178        schemars::json_schema!({
179            "type": "string",
180            "description": "Retain records unless explicitly trimmed (\"infinite\"), or automatically \
181                trim records older than the given duration (e.g. \"7days\", \"1week\").",
182            "examples": ["infinite", "7days", "1week"]
183        })
184    }
185}
186
187#[derive(Debug, Clone, Deserialize, schemars::JsonSchema)]
188#[serde(deny_unknown_fields)]
189pub struct TimestampingSpec {
190    /// Timestamping mode for appends that influences how timestamps are handled.
191    #[serde(default)]
192    pub mode: Option<TimestampingModeSpec>,
193    /// Allow client-specified timestamps to exceed the arrival time.
194    /// If this is `false` or not set, client timestamps will be capped at the arrival time.
195    #[serde(default)]
196    pub uncapped: Option<bool>,
197}
198
199#[derive(Debug, Clone, Deserialize, Serialize)]
200#[serde(rename_all = "kebab-case")]
201pub enum TimestampingModeSpec {
202    ClientPrefer,
203    ClientRequire,
204    Arrival,
205}
206
207impl schemars::JsonSchema for TimestampingModeSpec {
208    fn schema_name() -> Cow<'static, str> {
209        "TimestampingModeSpec".into()
210    }
211
212    fn json_schema(_: &mut schemars::SchemaGenerator) -> schemars::Schema {
213        schemars::json_schema!({
214            "type": "string",
215            "description": "Timestamping mode for appends that influences how timestamps are handled.",
216            "enum": ["client-prefer", "client-require", "arrival"]
217        })
218    }
219}
220
221impl From<TimestampingModeSpec> for TimestampingMode {
222    fn from(m: TimestampingModeSpec) -> Self {
223        match m {
224            TimestampingModeSpec::ClientPrefer => TimestampingMode::ClientPrefer,
225            TimestampingModeSpec::ClientRequire => TimestampingMode::ClientRequire,
226            TimestampingModeSpec::Arrival => TimestampingMode::Arrival,
227        }
228    }
229}
230
231#[derive(Debug, Clone, Deserialize, schemars::JsonSchema)]
232#[serde(deny_unknown_fields)]
233pub struct DeleteOnEmptySpec {
234    /// Minimum age before an empty stream can be deleted.
235    /// Set to 0 (default) to disable delete-on-empty (don't delete automatically).
236    #[serde(default)]
237    pub min_age: Option<HumanDuration>,
238}
239
240/// A `std::time::Duration` deserialized from a humantime string (e.g. `"1d"`, `"2h 30m"`).
241#[derive(Debug, Clone, Copy)]
242pub struct HumanDuration(pub Duration);
243
244impl TryFrom<String> for HumanDuration {
245    type Error = String;
246
247    fn try_from(s: String) -> Result<Self, Self::Error> {
248        humantime::parse_duration(&s)
249            .map(HumanDuration)
250            .map_err(|e| format!("invalid duration {:?}: {}", s, e))
251    }
252}
253
254impl<'de> Deserialize<'de> for HumanDuration {
255    fn deserialize<D: serde::Deserializer<'de>>(d: D) -> Result<Self, D::Error> {
256        let s = String::deserialize(d)?;
257        HumanDuration::try_from(s).map_err(serde::de::Error::custom)
258    }
259}
260
261impl schemars::JsonSchema for HumanDuration {
262    fn schema_name() -> Cow<'static, str> {
263        "HumanDuration".into()
264    }
265
266    fn json_schema(_: &mut schemars::SchemaGenerator) -> schemars::Schema {
267        schemars::json_schema!({
268            "type": "string",
269            "description": "A duration string in humantime format, e.g. \"1day\", \"2h 30m\"",
270            "examples": ["1day", "2h 30m"]
271        })
272    }
273}
274
275impl From<BasinConfigSpec> for BasinReconfiguration {
276    fn from(s: BasinConfigSpec) -> Self {
277        BasinReconfiguration {
278            default_stream_config: s
279                .default_stream_config
280                .map(|dsc| Some(StreamReconfiguration::from(dsc)))
281                .map_or(Maybe::Unspecified, Maybe::Specified),
282            stream_cipher: s
283                .stream_cipher
284                .map(|algorithm| Some(algorithm.into()))
285                .map_or(Maybe::Unspecified, Maybe::Specified),
286            create_stream_on_append: s
287                .create_stream_on_append
288                .map_or(Maybe::Unspecified, Maybe::Specified),
289            create_stream_on_read: s
290                .create_stream_on_read
291                .map_or(Maybe::Unspecified, Maybe::Specified),
292        }
293    }
294}
295
296impl From<StreamConfigSpec> for StreamReconfiguration {
297    fn from(s: StreamConfigSpec) -> Self {
298        StreamReconfiguration {
299            storage_class: s
300                .storage_class
301                .map(|sc| Some(StorageClass::from(sc)))
302                .map_or(Maybe::Unspecified, Maybe::Specified),
303            retention_policy: s
304                .retention_policy
305                .map(|rp| Some(rp.0))
306                .map_or(Maybe::Unspecified, Maybe::Specified),
307            timestamping: s
308                .timestamping
309                .map(|ts| {
310                    Some(TimestampingReconfiguration {
311                        mode: ts
312                            .mode
313                            .map(|m| Some(TimestampingMode::from(m)))
314                            .map_or(Maybe::Unspecified, Maybe::Specified),
315                        uncapped: ts
316                            .uncapped
317                            .map(Some)
318                            .map_or(Maybe::Unspecified, Maybe::Specified),
319                    })
320                })
321                .map_or(Maybe::Unspecified, Maybe::Specified),
322            delete_on_empty: s
323                .delete_on_empty
324                .map(|doe| {
325                    Some(DeleteOnEmptyReconfiguration {
326                        min_age: doe
327                            .min_age
328                            .map(|h| Some(h.0))
329                            .map_or(Maybe::Unspecified, Maybe::Specified),
330                    })
331                })
332                .map_or(Maybe::Unspecified, Maybe::Specified),
333        }
334    }
335}
336
337pub fn json_schema() -> serde_json::Value {
338    serde_json::to_value(schemars::schema_for!(ResourcesSpec)).unwrap()
339}
340
341pub fn validate(spec: &ResourcesSpec) -> eyre::Result<()> {
342    let mut errors = Vec::new();
343    let mut seen_basins = std::collections::HashSet::new();
344
345    for basin_spec in &spec.basins {
346        if !seen_basins.insert(basin_spec.name.clone()) {
347            errors.push(format!("duplicate basin name {:?}", basin_spec.name));
348        }
349
350        if let Err(e) = basin_spec.name.parse::<BasinName>() {
351            errors.push(format!("invalid basin name {:?}: {}", basin_spec.name, e));
352            continue;
353        }
354
355        let mut seen_streams = std::collections::HashSet::new();
356        for stream_spec in &basin_spec.streams {
357            if !seen_streams.insert(stream_spec.name.clone()) {
358                errors.push(format!(
359                    "duplicate stream name {:?} in basin {:?}",
360                    stream_spec.name, basin_spec.name
361                ));
362            }
363            if let Err(e) = stream_spec.name.parse::<StreamName>() {
364                errors.push(format!(
365                    "invalid stream name {:?} in basin {:?}: {}",
366                    stream_spec.name, basin_spec.name, e
367                ));
368            }
369        }
370    }
371
372    if errors.is_empty() {
373        Ok(())
374    } else {
375        Err(eyre::eyre!("{}", errors.join("\n")))
376    }
377}
378
379pub fn load(path: &Path) -> eyre::Result<ResourcesSpec> {
380    let contents = std::fs::read_to_string(path)
381        .map_err(|e| eyre::eyre!("failed to read init file {:?}: {}", path, e))?;
382    let spec: ResourcesSpec = serde_json::from_str(&contents)
383        .map_err(|e| eyre::eyre!("failed to parse init file {:?}: {}", path, e))?;
384    Ok(spec)
385}
386
387pub async fn apply(backend: &Backend, spec: ResourcesSpec) -> eyre::Result<()> {
388    validate(&spec)?;
389
390    for basin_spec in spec.basins {
391        let basin: BasinName = basin_spec
392            .name
393            .parse()
394            .map_err(|e| eyre::eyre!("invalid basin name {:?}: {}", basin_spec.name, e))?;
395
396        let reconfiguration = basin_spec
397            .config
398            .map(BasinReconfiguration::from)
399            .unwrap_or_default();
400
401        backend
402            .create_basin(
403                basin.clone(),
404                CreateBasinIntent::CreateOrReconfigure { reconfiguration },
405            )
406            .await
407            .map_err(|e| eyre::eyre!("failed to apply basin {:?}: {}", basin.as_ref(), e))?;
408
409        info!(basin = basin.as_ref(), "basin applied");
410
411        for stream_spec in basin_spec.streams {
412            let stream: StreamName = stream_spec
413                .name
414                .parse()
415                .map_err(|e| eyre::eyre!("invalid stream name {:?}: {}", stream_spec.name, e))?;
416
417            let reconfiguration = stream_spec
418                .config
419                .map(StreamReconfiguration::from)
420                .unwrap_or_default();
421
422            backend
423                .create_stream(
424                    basin.clone(),
425                    stream.clone(),
426                    CreateStreamIntent::CreateOrReconfigure { reconfiguration },
427                )
428                .await
429                .map_err(|e| {
430                    eyre::eyre!(
431                        "failed to apply stream {:?}/{:?}: {}",
432                        basin.as_ref(),
433                        stream.as_ref(),
434                        e
435                    )
436                })?;
437
438            info!(
439                basin = basin.as_ref(),
440                stream = stream.as_ref(),
441                "stream applied"
442            );
443        }
444    }
445    Ok(())
446}
447
448#[cfg(test)]
449mod tests {
450    use super::*;
451
452    fn parse_spec(json: &str) -> ResourcesSpec {
453        serde_json::from_str(json).expect("valid JSON")
454    }
455
456    #[test]
457    fn empty_spec() {
458        let spec = parse_spec("{}");
459        assert!(spec.basins.is_empty());
460    }
461
462    #[test]
463    fn basin_no_config() {
464        let spec = parse_spec(r#"{"basins":[{"name":"my-basin"}]}"#);
465        assert_eq!(spec.basins.len(), 1);
466        assert_eq!(spec.basins[0].name, "my-basin");
467        assert!(spec.basins[0].config.is_none());
468        assert!(spec.basins[0].streams.is_empty());
469    }
470
471    #[test]
472    fn retention_policy_infinite() {
473        let rp: RetentionPolicySpec = serde_json::from_str(r#""infinite""#).expect("deserialize");
474        assert!(matches!(rp.0, RetentionPolicy::Infinite()));
475    }
476
477    #[test]
478    fn retention_policy_duration() {
479        let rp: RetentionPolicySpec = serde_json::from_str(r#""7days""#).expect("deserialize");
480        assert!(matches!(rp.0, RetentionPolicy::Age(_)));
481        if let RetentionPolicy::Age(d) = rp.0 {
482            assert_eq!(d, Duration::from_secs(7 * 24 * 3600));
483        }
484    }
485
486    #[test]
487    fn retention_policy_invalid() {
488        let err = serde_json::from_str::<RetentionPolicySpec>(r#""not-a-duration""#);
489        assert!(err.is_err());
490    }
491
492    #[test]
493    fn human_duration() {
494        let hd: HumanDuration = serde_json::from_str(r#""1day""#).expect("deserialize");
495        assert_eq!(hd.0, Duration::from_secs(86400));
496    }
497
498    #[test]
499    fn full_spec_roundtrip() {
500        let json = r#"
501        {
502          "basins": [
503            {
504              "name": "my-basin",
505              "config": {
506                "create_stream_on_append": true,
507                "create_stream_on_read": false,
508                "default_stream_config": {
509                  "storage_class": "express",
510                  "retention_policy": "7days",
511                  "timestamping": {
512                    "mode": "client-prefer",
513                    "uncapped": false
514                  },
515                  "delete_on_empty": {
516                    "min_age": "1day"
517                  }
518                }
519              },
520              "streams": [
521                {
522                  "name": "events",
523                  "config": {
524                    "storage_class": "standard",
525                    "retention_policy": "infinite"
526                  }
527                }
528              ]
529            }
530          ]
531        }"#;
532
533        let spec = parse_spec(json);
534        assert_eq!(spec.basins.len(), 1);
535        let basin = &spec.basins[0];
536        assert_eq!(basin.name, "my-basin");
537
538        let config = basin.config.as_ref().unwrap();
539        assert_eq!(config.create_stream_on_append, Some(true));
540        assert_eq!(config.create_stream_on_read, Some(false));
541
542        let dsc = config.default_stream_config.as_ref().unwrap();
543        assert!(matches!(dsc.storage_class, Some(StorageClassSpec::Express)));
544        assert!(matches!(
545            dsc.retention_policy.as_ref().map(|r| &r.0),
546            Some(RetentionPolicy::Age(_))
547        ));
548
549        let ts = dsc.timestamping.as_ref().unwrap();
550        assert!(matches!(ts.mode, Some(TimestampingModeSpec::ClientPrefer)));
551        assert_eq!(ts.uncapped, Some(false));
552
553        let doe = dsc.delete_on_empty.as_ref().unwrap();
554        assert_eq!(
555            doe.min_age.as_ref().map(|h| h.0),
556            Some(Duration::from_secs(86400))
557        );
558
559        assert_eq!(basin.streams.len(), 1);
560        let stream = &basin.streams[0];
561        assert_eq!(stream.name, "events");
562        let sc = stream.config.as_ref().unwrap();
563        assert!(matches!(sc.storage_class, Some(StorageClassSpec::Standard)));
564        assert!(matches!(
565            sc.retention_policy.as_ref().map(|r| &r.0),
566            Some(RetentionPolicy::Infinite())
567        ));
568    }
569
570    #[test]
571    fn basin_config_conversion() {
572        let spec = BasinConfigSpec {
573            default_stream_config: None,
574            stream_cipher: None,
575            create_stream_on_append: Some(true),
576            create_stream_on_read: None,
577        };
578        let reconfig = BasinReconfiguration::from(spec);
579        assert!(matches!(
580            reconfig.create_stream_on_append,
581            Maybe::Specified(true)
582        ));
583        assert!(matches!(reconfig.create_stream_on_read, Maybe::Unspecified));
584        assert!(matches!(reconfig.default_stream_config, Maybe::Unspecified));
585    }
586
587    #[test]
588    fn validate_valid_spec() {
589        let spec = parse_spec(
590            r#"{"basins":[{"name":"my-basin","streams":[{"name":"events"},{"name":"logs"}]}]}"#,
591        );
592        assert!(validate(&spec).is_ok());
593    }
594
595    #[test]
596    fn validate_invalid_basin_name() {
597        let spec = parse_spec(r#"{"basins":[{"name":"INVALID_BASIN"}]}"#);
598        let err = validate(&spec).unwrap_err();
599        assert!(err.to_string().contains("invalid basin name"));
600    }
601
602    #[test]
603    fn validate_invalid_stream_name() {
604        let spec = parse_spec(r#"{"basins":[{"name":"my-basin","streams":[{"name":""}]}]}"#);
605        let err = validate(&spec).unwrap_err();
606        assert!(err.to_string().contains("invalid stream name"));
607    }
608
609    #[test]
610    fn validate_duplicate_basin_names() {
611        let spec = parse_spec(r#"{"basins":[{"name":"my-basin"},{"name":"my-basin"}]}"#);
612        let err = validate(&spec).unwrap_err();
613        assert!(err.to_string().contains("duplicate basin name"));
614    }
615
616    #[test]
617    fn validate_duplicate_stream_names() {
618        let spec = parse_spec(
619            r#"{"basins":[{"name":"my-basin","streams":[{"name":"events"},{"name":"events"}]}]}"#,
620        );
621        let err = validate(&spec).unwrap_err();
622        assert!(err.to_string().contains("duplicate stream name"));
623    }
624
625    #[test]
626    fn validate_multiple_errors() {
627        let spec = parse_spec(r#"{"basins":[{"name":"INVALID"},{"name":"INVALID"}]}"#);
628        let err = validate(&spec).unwrap_err();
629        let msg = err.to_string();
630        assert!(msg.contains("invalid basin name"));
631        assert!(msg.contains("duplicate basin name"));
632    }
633
634    #[test]
635    fn json_schema_is_valid() {
636        let schema = json_schema();
637        assert!(schema.is_object());
638        let schema_obj = schema.as_object().unwrap();
639
640        // using the default generated
641        assert_eq!(
642            schema_obj.get("$schema"),
643            Some(&serde_json::Value::String(
644                "https://json-schema.org/draft/2020-12/schema".to_string()
645            ))
646        );
647
648        assert!(
649            schema_obj.contains_key("properties"),
650            "schema should have root properties"
651        );
652
653        assert!(
654            schema_obj.contains_key("$defs"),
655            "schema should have $defs for reusable definitions"
656        );
657
658        let properties = schema_obj.get("properties").unwrap().as_object().unwrap();
659        assert!(
660            properties.contains_key("basins"),
661            "schema should include the `basins` property"
662        );
663    }
664
665    #[test]
666    fn stream_config_conversion() {
667        let spec = StreamConfigSpec {
668            storage_class: Some(StorageClassSpec::Standard),
669            retention_policy: Some(RetentionPolicySpec(RetentionPolicy::Infinite())),
670            timestamping: None,
671            delete_on_empty: None,
672        };
673        let reconfig = StreamReconfiguration::from(spec);
674        assert!(matches!(
675            reconfig.storage_class,
676            Maybe::Specified(Some(StorageClass::Standard))
677        ));
678        assert!(matches!(
679            reconfig.retention_policy,
680            Maybe::Specified(Some(RetentionPolicy::Infinite()))
681        ));
682        assert!(matches!(reconfig.timestamping, Maybe::Unspecified));
683        assert!(matches!(reconfig.delete_on_empty, Maybe::Unspecified));
684    }
685}