Skip to main content

s2_lite/
init.rs

1//! Declarative basin/stream initialization from a JSON spec file.
2//!
3//! Loaded at startup when `--init-file` / `S2LITE_INIT_FILE` is set.
4
5use std::{path::Path, time::Duration};
6
7use s2_common::{
8    maybe::Maybe,
9    types::{
10        basin::BasinName,
11        config::{
12            BasinReconfiguration, DeleteOnEmptyReconfiguration, RetentionPolicy, StorageClass,
13            StreamReconfiguration, TimestampingMode, TimestampingReconfiguration,
14        },
15        resources::CreateMode,
16        stream::StreamName,
17    },
18};
19use serde::{Deserialize, Serialize};
20use tracing::info;
21
22use crate::backend::Backend;
23
24#[derive(Debug, Deserialize, Default, schemars::JsonSchema)]
25pub struct ResourcesSpec {
26    #[serde(default)]
27    pub basins: Vec<BasinSpec>,
28}
29
30#[derive(Debug, Deserialize, schemars::JsonSchema)]
31#[serde(deny_unknown_fields)]
32pub struct BasinSpec {
33    pub name: String,
34    #[serde(default)]
35    pub config: Option<BasinConfigSpec>,
36    #[serde(default)]
37    pub streams: Vec<StreamSpec>,
38}
39
40#[derive(Debug, Deserialize, schemars::JsonSchema)]
41#[serde(deny_unknown_fields)]
42pub struct StreamSpec {
43    pub name: String,
44    #[serde(default)]
45    pub config: Option<StreamConfigSpec>,
46}
47
48#[derive(Debug, Clone, Deserialize, Default, schemars::JsonSchema)]
49#[serde(deny_unknown_fields)]
50pub struct BasinConfigSpec {
51    #[serde(default)]
52    pub default_stream_config: Option<StreamConfigSpec>,
53    /// Create stream on append if it doesn't exist, using the default stream configuration.
54    #[serde(default)]
55    pub create_stream_on_append: Option<bool>,
56    /// Create stream on read if it doesn't exist, using the default stream configuration.
57    #[serde(default)]
58    pub create_stream_on_read: Option<bool>,
59}
60
61#[derive(Debug, Clone, Deserialize, Default, schemars::JsonSchema)]
62#[serde(deny_unknown_fields)]
63pub struct StreamConfigSpec {
64    /// Storage class for recent writes.
65    #[serde(default)]
66    pub storage_class: Option<StorageClassSpec>,
67    /// Retention policy for the stream. If unspecified, the default is to retain records for 7
68    /// days.
69    #[serde(default)]
70    pub retention_policy: Option<RetentionPolicySpec>,
71    /// Timestamping behavior.
72    #[serde(default)]
73    pub timestamping: Option<TimestampingSpec>,
74    /// Delete-on-empty configuration.
75    #[serde(default)]
76    pub delete_on_empty: Option<DeleteOnEmptySpec>,
77}
78
79#[derive(Debug, Clone, Deserialize, Serialize)]
80#[serde(rename_all = "kebab-case")]
81pub enum StorageClassSpec {
82    Standard,
83    Express,
84}
85
86impl schemars::JsonSchema for StorageClassSpec {
87    fn schema_name() -> String {
88        "StorageClassSpec".to_string()
89    }
90
91    fn json_schema(_: &mut schemars::r#gen::SchemaGenerator) -> schemars::schema::Schema {
92        schemars::schema::Schema::Object(schemars::schema::SchemaObject {
93            instance_type: Some(schemars::schema::InstanceType::String.into()),
94            metadata: Some(Box::new(schemars::schema::Metadata {
95                description: Some("Storage class for recent writes.".to_string()),
96                ..Default::default()
97            })),
98            enum_values: Some(vec![
99                serde_json::Value::String("standard".to_string()),
100                serde_json::Value::String("express".to_string()),
101            ]),
102            ..Default::default()
103        })
104    }
105}
106
107impl From<StorageClassSpec> for StorageClass {
108    fn from(s: StorageClassSpec) -> Self {
109        match s {
110            StorageClassSpec::Standard => StorageClass::Standard,
111            StorageClassSpec::Express => StorageClass::Express,
112        }
113    }
114}
115
116/// Accepts `"infinite"` or a humantime duration string such as `"7d"`, `"1w"`.
117#[derive(Debug, Clone, Copy)]
118pub struct RetentionPolicySpec(pub RetentionPolicy);
119
120impl RetentionPolicySpec {
121    pub fn age_secs(self) -> Option<u64> {
122        self.0.age().map(|d| d.as_secs())
123    }
124}
125
126impl TryFrom<String> for RetentionPolicySpec {
127    type Error = String;
128
129    fn try_from(s: String) -> Result<Self, Self::Error> {
130        if s.eq_ignore_ascii_case("infinite") {
131            return Ok(RetentionPolicySpec(RetentionPolicy::Infinite()));
132        }
133        let d = humantime::parse_duration(&s)
134            .map_err(|e| format!("invalid retention_policy {:?}: {}", s, e))?;
135        Ok(RetentionPolicySpec(RetentionPolicy::Age(d)))
136    }
137}
138
139impl<'de> Deserialize<'de> for RetentionPolicySpec {
140    fn deserialize<D: serde::Deserializer<'de>>(d: D) -> Result<Self, D::Error> {
141        let s = String::deserialize(d)?;
142        RetentionPolicySpec::try_from(s).map_err(serde::de::Error::custom)
143    }
144}
145
146impl schemars::JsonSchema for RetentionPolicySpec {
147    fn schema_name() -> String {
148        "RetentionPolicySpec".to_string()
149    }
150
151    fn json_schema(_: &mut schemars::r#gen::SchemaGenerator) -> schemars::schema::Schema {
152        schemars::schema::Schema::Object(schemars::schema::SchemaObject {
153            instance_type: Some(schemars::schema::InstanceType::String.into()),
154            metadata: Some(Box::new(schemars::schema::Metadata {
155                description: Some(
156                    "Retain records unless explicitly trimmed (\"infinite\"), or automatically \
157                     trim records older than the given duration (e.g. \"7days\", \"1week\")."
158                        .to_string(),
159                ),
160                examples: vec![
161                    serde_json::Value::String("infinite".to_string()),
162                    serde_json::Value::String("7days".to_string()),
163                    serde_json::Value::String("1week".to_string()),
164                ],
165                ..Default::default()
166            })),
167            ..Default::default()
168        })
169    }
170}
171
172#[derive(Debug, Clone, Deserialize, schemars::JsonSchema)]
173#[serde(deny_unknown_fields)]
174pub struct TimestampingSpec {
175    /// Timestamping mode for appends that influences how timestamps are handled.
176    #[serde(default)]
177    pub mode: Option<TimestampingModeSpec>,
178    /// Allow client-specified timestamps to exceed the arrival time.
179    /// If this is `false` or not set, client timestamps will be capped at the arrival time.
180    #[serde(default)]
181    pub uncapped: Option<bool>,
182}
183
184#[derive(Debug, Clone, Deserialize, Serialize)]
185#[serde(rename_all = "kebab-case")]
186pub enum TimestampingModeSpec {
187    ClientPrefer,
188    ClientRequire,
189    Arrival,
190}
191
192impl schemars::JsonSchema for TimestampingModeSpec {
193    fn schema_name() -> String {
194        "TimestampingModeSpec".to_string()
195    }
196
197    fn json_schema(_: &mut schemars::r#gen::SchemaGenerator) -> schemars::schema::Schema {
198        schemars::schema::Schema::Object(schemars::schema::SchemaObject {
199            instance_type: Some(schemars::schema::InstanceType::String.into()),
200            metadata: Some(Box::new(schemars::schema::Metadata {
201                description: Some(
202                    "Timestamping mode for appends that influences how timestamps are handled."
203                        .to_string(),
204                ),
205                ..Default::default()
206            })),
207            enum_values: Some(vec![
208                serde_json::Value::String("client-prefer".to_string()),
209                serde_json::Value::String("client-require".to_string()),
210                serde_json::Value::String("arrival".to_string()),
211            ]),
212            ..Default::default()
213        })
214    }
215}
216
217impl From<TimestampingModeSpec> for TimestampingMode {
218    fn from(m: TimestampingModeSpec) -> Self {
219        match m {
220            TimestampingModeSpec::ClientPrefer => TimestampingMode::ClientPrefer,
221            TimestampingModeSpec::ClientRequire => TimestampingMode::ClientRequire,
222            TimestampingModeSpec::Arrival => TimestampingMode::Arrival,
223        }
224    }
225}
226
227#[derive(Debug, Clone, Deserialize, schemars::JsonSchema)]
228#[serde(deny_unknown_fields)]
229pub struct DeleteOnEmptySpec {
230    /// Minimum age before an empty stream can be deleted.
231    /// Set to 0 (default) to disable delete-on-empty (don't delete automatically).
232    #[serde(default)]
233    pub min_age: Option<HumanDuration>,
234}
235
236/// A `std::time::Duration` deserialized from a humantime string (e.g. `"1d"`, `"2h 30m"`).
237#[derive(Debug, Clone, Copy)]
238pub struct HumanDuration(pub Duration);
239
240impl TryFrom<String> for HumanDuration {
241    type Error = String;
242
243    fn try_from(s: String) -> Result<Self, Self::Error> {
244        humantime::parse_duration(&s)
245            .map(HumanDuration)
246            .map_err(|e| format!("invalid duration {:?}: {}", s, e))
247    }
248}
249
250impl<'de> Deserialize<'de> for HumanDuration {
251    fn deserialize<D: serde::Deserializer<'de>>(d: D) -> Result<Self, D::Error> {
252        let s = String::deserialize(d)?;
253        HumanDuration::try_from(s).map_err(serde::de::Error::custom)
254    }
255}
256
257impl schemars::JsonSchema for HumanDuration {
258    fn schema_name() -> String {
259        "HumanDuration".to_string()
260    }
261
262    fn json_schema(_: &mut schemars::r#gen::SchemaGenerator) -> schemars::schema::Schema {
263        schemars::schema::Schema::Object(schemars::schema::SchemaObject {
264            instance_type: Some(schemars::schema::InstanceType::String.into()),
265            metadata: Some(Box::new(schemars::schema::Metadata {
266                description: Some(
267                    "A duration string in humantime format, e.g. \"1day\", \"2h 30m\"".to_string(),
268                ),
269                examples: vec![
270                    serde_json::Value::String("1day".to_string()),
271                    serde_json::Value::String("2h 30m".to_string()),
272                ],
273                ..Default::default()
274            })),
275            ..Default::default()
276        })
277    }
278}
279
280impl From<BasinConfigSpec> for BasinReconfiguration {
281    fn from(s: BasinConfigSpec) -> Self {
282        BasinReconfiguration {
283            default_stream_config: s
284                .default_stream_config
285                .map(|dsc| Some(StreamReconfiguration::from(dsc)))
286                .map_or(Maybe::Unspecified, Maybe::Specified),
287            create_stream_on_append: s
288                .create_stream_on_append
289                .map_or(Maybe::Unspecified, Maybe::Specified),
290            create_stream_on_read: s
291                .create_stream_on_read
292                .map_or(Maybe::Unspecified, Maybe::Specified),
293        }
294    }
295}
296
297impl From<StreamConfigSpec> for StreamReconfiguration {
298    fn from(s: StreamConfigSpec) -> Self {
299        StreamReconfiguration {
300            storage_class: s
301                .storage_class
302                .map(|sc| Some(StorageClass::from(sc)))
303                .map_or(Maybe::Unspecified, Maybe::Specified),
304            retention_policy: s
305                .retention_policy
306                .map(|rp| Some(rp.0))
307                .map_or(Maybe::Unspecified, Maybe::Specified),
308            timestamping: s
309                .timestamping
310                .map(|ts| {
311                    Some(TimestampingReconfiguration {
312                        mode: ts
313                            .mode
314                            .map(|m| Some(TimestampingMode::from(m)))
315                            .map_or(Maybe::Unspecified, Maybe::Specified),
316                        uncapped: ts
317                            .uncapped
318                            .map(Some)
319                            .map_or(Maybe::Unspecified, Maybe::Specified),
320                    })
321                })
322                .map_or(Maybe::Unspecified, Maybe::Specified),
323            delete_on_empty: s
324                .delete_on_empty
325                .map(|doe| {
326                    Some(DeleteOnEmptyReconfiguration {
327                        min_age: doe
328                            .min_age
329                            .map(|h| Some(h.0))
330                            .map_or(Maybe::Unspecified, Maybe::Specified),
331                    })
332                })
333                .map_or(Maybe::Unspecified, Maybe::Specified),
334        }
335    }
336}
337
338pub fn json_schema() -> serde_json::Value {
339    serde_json::to_value(schemars::schema_for!(ResourcesSpec)).unwrap()
340}
341
342pub fn validate(spec: &ResourcesSpec) -> eyre::Result<()> {
343    let mut errors = Vec::new();
344    let mut seen_basins = std::collections::HashSet::new();
345
346    for basin_spec in &spec.basins {
347        if !seen_basins.insert(basin_spec.name.clone()) {
348            errors.push(format!("duplicate basin name {:?}", basin_spec.name));
349        }
350
351        if let Err(e) = basin_spec.name.parse::<BasinName>() {
352            errors.push(format!("invalid basin name {:?}: {}", basin_spec.name, e));
353            continue;
354        }
355
356        let mut seen_streams = std::collections::HashSet::new();
357        for stream_spec in &basin_spec.streams {
358            if !seen_streams.insert(stream_spec.name.clone()) {
359                errors.push(format!(
360                    "duplicate stream name {:?} in basin {:?}",
361                    stream_spec.name, basin_spec.name
362                ));
363            }
364            if let Err(e) = stream_spec.name.parse::<StreamName>() {
365                errors.push(format!(
366                    "invalid stream name {:?} in basin {:?}: {}",
367                    stream_spec.name, basin_spec.name, e
368                ));
369            }
370        }
371    }
372
373    if errors.is_empty() {
374        Ok(())
375    } else {
376        Err(eyre::eyre!("{}", errors.join("\n")))
377    }
378}
379
380pub fn load(path: &Path) -> eyre::Result<ResourcesSpec> {
381    let contents = std::fs::read_to_string(path)
382        .map_err(|e| eyre::eyre!("failed to read init file {:?}: {}", path, e))?;
383    let spec: ResourcesSpec = serde_json::from_str(&contents)
384        .map_err(|e| eyre::eyre!("failed to parse init file {:?}: {}", path, e))?;
385    Ok(spec)
386}
387
388pub async fn apply(backend: &Backend, spec: ResourcesSpec) -> eyre::Result<()> {
389    validate(&spec)?;
390
391    for basin_spec in spec.basins {
392        let basin: BasinName = basin_spec
393            .name
394            .parse()
395            .map_err(|e| eyre::eyre!("invalid basin name {:?}: {}", basin_spec.name, e))?;
396
397        let reconfiguration = basin_spec
398            .config
399            .map(BasinReconfiguration::from)
400            .unwrap_or_default();
401
402        backend
403            .create_basin(
404                basin.clone(),
405                reconfiguration,
406                CreateMode::CreateOrReconfigure,
407            )
408            .await
409            .map_err(|e| eyre::eyre!("failed to apply basin {:?}: {}", basin.as_ref(), e))?;
410
411        info!(basin = basin.as_ref(), "basin applied");
412
413        for stream_spec in basin_spec.streams {
414            let stream: StreamName = stream_spec
415                .name
416                .parse()
417                .map_err(|e| eyre::eyre!("invalid stream name {:?}: {}", stream_spec.name, e))?;
418
419            let reconfiguration = stream_spec
420                .config
421                .map(StreamReconfiguration::from)
422                .unwrap_or_default();
423
424            backend
425                .create_stream(
426                    basin.clone(),
427                    stream.clone(),
428                    reconfiguration,
429                    CreateMode::CreateOrReconfigure,
430                )
431                .await
432                .map_err(|e| {
433                    eyre::eyre!(
434                        "failed to apply stream {:?}/{:?}: {}",
435                        basin.as_ref(),
436                        stream.as_ref(),
437                        e
438                    )
439                })?;
440
441            info!(
442                basin = basin.as_ref(),
443                stream = stream.as_ref(),
444                "stream applied"
445            );
446        }
447    }
448    Ok(())
449}
450
451#[cfg(test)]
452mod tests {
453    use super::*;
454
455    fn parse_spec(json: &str) -> ResourcesSpec {
456        serde_json::from_str(json).expect("valid JSON")
457    }
458
459    #[test]
460    fn empty_spec() {
461        let spec = parse_spec("{}");
462        assert!(spec.basins.is_empty());
463    }
464
465    #[test]
466    fn basin_no_config() {
467        let spec = parse_spec(r#"{"basins":[{"name":"my-basin"}]}"#);
468        assert_eq!(spec.basins.len(), 1);
469        assert_eq!(spec.basins[0].name, "my-basin");
470        assert!(spec.basins[0].config.is_none());
471        assert!(spec.basins[0].streams.is_empty());
472    }
473
474    #[test]
475    fn retention_policy_infinite() {
476        let rp: RetentionPolicySpec = serde_json::from_str(r#""infinite""#).expect("deserialize");
477        assert!(matches!(rp.0, RetentionPolicy::Infinite()));
478    }
479
480    #[test]
481    fn retention_policy_duration() {
482        let rp: RetentionPolicySpec = serde_json::from_str(r#""7days""#).expect("deserialize");
483        assert!(matches!(rp.0, RetentionPolicy::Age(_)));
484        if let RetentionPolicy::Age(d) = rp.0 {
485            assert_eq!(d, Duration::from_secs(7 * 24 * 3600));
486        }
487    }
488
489    #[test]
490    fn retention_policy_invalid() {
491        let err = serde_json::from_str::<RetentionPolicySpec>(r#""not-a-duration""#);
492        assert!(err.is_err());
493    }
494
495    #[test]
496    fn human_duration() {
497        let hd: HumanDuration = serde_json::from_str(r#""1day""#).expect("deserialize");
498        assert_eq!(hd.0, Duration::from_secs(86400));
499    }
500
501    #[test]
502    fn full_spec_roundtrip() {
503        let json = r#"
504        {
505          "basins": [
506            {
507              "name": "my-basin",
508              "config": {
509                "create_stream_on_append": true,
510                "create_stream_on_read": false,
511                "default_stream_config": {
512                  "storage_class": "express",
513                  "retention_policy": "7days",
514                  "timestamping": {
515                    "mode": "client-prefer",
516                    "uncapped": false
517                  },
518                  "delete_on_empty": {
519                    "min_age": "1day"
520                  }
521                }
522              },
523              "streams": [
524                {
525                  "name": "events",
526                  "config": {
527                    "storage_class": "standard",
528                    "retention_policy": "infinite"
529                  }
530                }
531              ]
532            }
533          ]
534        }"#;
535
536        let spec = parse_spec(json);
537        assert_eq!(spec.basins.len(), 1);
538        let basin = &spec.basins[0];
539        assert_eq!(basin.name, "my-basin");
540
541        let config = basin.config.as_ref().unwrap();
542        assert_eq!(config.create_stream_on_append, Some(true));
543        assert_eq!(config.create_stream_on_read, Some(false));
544
545        let dsc = config.default_stream_config.as_ref().unwrap();
546        assert!(matches!(dsc.storage_class, Some(StorageClassSpec::Express)));
547        assert!(matches!(
548            dsc.retention_policy.as_ref().map(|r| &r.0),
549            Some(RetentionPolicy::Age(_))
550        ));
551
552        let ts = dsc.timestamping.as_ref().unwrap();
553        assert!(matches!(ts.mode, Some(TimestampingModeSpec::ClientPrefer)));
554        assert_eq!(ts.uncapped, Some(false));
555
556        let doe = dsc.delete_on_empty.as_ref().unwrap();
557        assert_eq!(
558            doe.min_age.as_ref().map(|h| h.0),
559            Some(Duration::from_secs(86400))
560        );
561
562        assert_eq!(basin.streams.len(), 1);
563        let stream = &basin.streams[0];
564        assert_eq!(stream.name, "events");
565        let sc = stream.config.as_ref().unwrap();
566        assert!(matches!(sc.storage_class, Some(StorageClassSpec::Standard)));
567        assert!(matches!(
568            sc.retention_policy.as_ref().map(|r| &r.0),
569            Some(RetentionPolicy::Infinite())
570        ));
571    }
572
573    #[test]
574    fn basin_config_conversion() {
575        let spec = BasinConfigSpec {
576            default_stream_config: None,
577            create_stream_on_append: Some(true),
578            create_stream_on_read: None,
579        };
580        let reconfig = BasinReconfiguration::from(spec);
581        assert!(matches!(
582            reconfig.create_stream_on_append,
583            Maybe::Specified(true)
584        ));
585        assert!(matches!(reconfig.create_stream_on_read, Maybe::Unspecified));
586        assert!(matches!(reconfig.default_stream_config, Maybe::Unspecified));
587    }
588
589    #[test]
590    fn validate_valid_spec() {
591        let spec = parse_spec(
592            r#"{"basins":[{"name":"my-basin","streams":[{"name":"events"},{"name":"logs"}]}]}"#,
593        );
594        assert!(validate(&spec).is_ok());
595    }
596
597    #[test]
598    fn validate_invalid_basin_name() {
599        let spec = parse_spec(r#"{"basins":[{"name":"INVALID_BASIN"}]}"#);
600        let err = validate(&spec).unwrap_err();
601        assert!(err.to_string().contains("invalid basin name"));
602    }
603
604    #[test]
605    fn validate_invalid_stream_name() {
606        let spec = parse_spec(r#"{"basins":[{"name":"my-basin","streams":[{"name":""}]}]}"#);
607        let err = validate(&spec).unwrap_err();
608        assert!(err.to_string().contains("invalid stream name"));
609    }
610
611    #[test]
612    fn validate_duplicate_basin_names() {
613        let spec = parse_spec(r#"{"basins":[{"name":"my-basin"},{"name":"my-basin"}]}"#);
614        let err = validate(&spec).unwrap_err();
615        assert!(err.to_string().contains("duplicate basin name"));
616    }
617
618    #[test]
619    fn validate_duplicate_stream_names() {
620        let spec = parse_spec(
621            r#"{"basins":[{"name":"my-basin","streams":[{"name":"events"},{"name":"events"}]}]}"#,
622        );
623        let err = validate(&spec).unwrap_err();
624        assert!(err.to_string().contains("duplicate stream name"));
625    }
626
627    #[test]
628    fn validate_multiple_errors() {
629        let spec = parse_spec(r#"{"basins":[{"name":"INVALID"},{"name":"INVALID"}]}"#);
630        let err = validate(&spec).unwrap_err();
631        let msg = err.to_string();
632        assert!(msg.contains("invalid basin name"));
633        assert!(msg.contains("duplicate basin name"));
634    }
635
636    #[test]
637    fn json_schema_is_valid() {
638        let schema = json_schema();
639        assert!(schema.is_object());
640        let schema_obj = schema.as_object().unwrap();
641        // Should have at minimum a definitions/properties structure
642        assert!(
643            schema_obj.contains_key("definitions") || schema_obj.contains_key("properties"),
644            "schema should have definitions or properties"
645        );
646    }
647
648    #[test]
649    fn stream_config_conversion() {
650        let spec = StreamConfigSpec {
651            storage_class: Some(StorageClassSpec::Standard),
652            retention_policy: Some(RetentionPolicySpec(RetentionPolicy::Infinite())),
653            timestamping: None,
654            delete_on_empty: None,
655        };
656        let reconfig = StreamReconfiguration::from(spec);
657        assert!(matches!(
658            reconfig.storage_class,
659            Maybe::Specified(Some(StorageClass::Standard))
660        ));
661        assert!(matches!(
662            reconfig.retention_policy,
663            Maybe::Specified(Some(RetentionPolicy::Infinite()))
664        ));
665        assert!(matches!(reconfig.timestamping, Maybe::Unspecified));
666        assert!(matches!(reconfig.delete_on_empty, Maybe::Unspecified));
667    }
668}