Skip to main content

s2_lite/
init.rs

1//! Declarative basin/stream initialization from a JSON spec file.
2//!
3//! Loaded at startup when `--init-file` / `S2LITE_INIT_FILE` is set.
4
5use std::{borrow::Cow, path::Path, time::Duration};
6
7use s2_common::{
8    maybe::Maybe,
9    types::{
10        basin::BasinName,
11        config::{
12            BasinReconfiguration, DeleteOnEmptyReconfiguration, RetentionPolicy, StorageClass,
13            StreamReconfiguration, TimestampingMode, TimestampingReconfiguration,
14        },
15        resources::CreateMode,
16        stream::StreamName,
17    },
18};
19use serde::{Deserialize, Serialize};
20use tracing::info;
21
22use crate::backend::Backend;
23
24#[derive(Debug, Deserialize, Default, schemars::JsonSchema)]
25pub struct ResourcesSpec {
26    #[serde(default)]
27    pub basins: Vec<BasinSpec>,
28}
29
30#[derive(Debug, Deserialize, schemars::JsonSchema)]
31#[serde(deny_unknown_fields)]
32pub struct BasinSpec {
33    pub name: String,
34    #[serde(default)]
35    pub config: Option<BasinConfigSpec>,
36    #[serde(default)]
37    pub streams: Vec<StreamSpec>,
38}
39
40#[derive(Debug, Deserialize, schemars::JsonSchema)]
41#[serde(deny_unknown_fields)]
42pub struct StreamSpec {
43    pub name: String,
44    #[serde(default)]
45    pub config: Option<StreamConfigSpec>,
46}
47
48#[derive(Debug, Clone, Deserialize, Default, schemars::JsonSchema)]
49#[serde(deny_unknown_fields)]
50pub struct BasinConfigSpec {
51    #[serde(default)]
52    pub default_stream_config: Option<StreamConfigSpec>,
53    /// Create stream on append if it doesn't exist, using the default stream configuration.
54    #[serde(default)]
55    pub create_stream_on_append: Option<bool>,
56    /// Create stream on read if it doesn't exist, using the default stream configuration.
57    #[serde(default)]
58    pub create_stream_on_read: Option<bool>,
59}
60
61#[derive(Debug, Clone, Deserialize, Default, schemars::JsonSchema)]
62#[serde(deny_unknown_fields)]
63pub struct StreamConfigSpec {
64    /// Storage class for recent writes.
65    #[serde(default)]
66    pub storage_class: Option<StorageClassSpec>,
67    /// Retention policy for the stream. If unspecified, the default is to retain records for 7
68    /// days.
69    #[serde(default)]
70    pub retention_policy: Option<RetentionPolicySpec>,
71    /// Timestamping behavior.
72    #[serde(default)]
73    pub timestamping: Option<TimestampingSpec>,
74    /// Delete-on-empty configuration.
75    #[serde(default)]
76    pub delete_on_empty: Option<DeleteOnEmptySpec>,
77}
78
79#[derive(Debug, Clone, Deserialize, Serialize)]
80#[serde(rename_all = "kebab-case")]
81pub enum StorageClassSpec {
82    Standard,
83    Express,
84}
85
86impl schemars::JsonSchema for StorageClassSpec {
87    fn schema_name() -> Cow<'static, str> {
88        "StorageClassSpec".into()
89    }
90
91    fn json_schema(_: &mut schemars::SchemaGenerator) -> schemars::Schema {
92        schemars::json_schema!({
93            "type": "string",
94            "description": "Storage class for recent writes.",
95            "enum": ["standard", "express"]
96        })
97    }
98}
99
100impl From<StorageClassSpec> for StorageClass {
101    fn from(s: StorageClassSpec) -> Self {
102        match s {
103            StorageClassSpec::Standard => StorageClass::Standard,
104            StorageClassSpec::Express => StorageClass::Express,
105        }
106    }
107}
108
109/// Accepts `"infinite"` or a humantime duration string such as `"7d"`, `"1w"`.
110#[derive(Debug, Clone, Copy)]
111pub struct RetentionPolicySpec(pub RetentionPolicy);
112
113impl RetentionPolicySpec {
114    pub fn age_secs(self) -> Option<u64> {
115        self.0.age().map(|d| d.as_secs())
116    }
117}
118
119impl TryFrom<String> for RetentionPolicySpec {
120    type Error = String;
121
122    fn try_from(s: String) -> Result<Self, Self::Error> {
123        if s.eq_ignore_ascii_case("infinite") {
124            return Ok(RetentionPolicySpec(RetentionPolicy::Infinite()));
125        }
126        let d = humantime::parse_duration(&s)
127            .map_err(|e| format!("invalid retention_policy {:?}: {}", s, e))?;
128        Ok(RetentionPolicySpec(RetentionPolicy::Age(d)))
129    }
130}
131
132impl<'de> Deserialize<'de> for RetentionPolicySpec {
133    fn deserialize<D: serde::Deserializer<'de>>(d: D) -> Result<Self, D::Error> {
134        let s = String::deserialize(d)?;
135        RetentionPolicySpec::try_from(s).map_err(serde::de::Error::custom)
136    }
137}
138
139impl schemars::JsonSchema for RetentionPolicySpec {
140    fn schema_name() -> Cow<'static, str> {
141        "RetentionPolicySpec".into()
142    }
143
144    fn json_schema(_: &mut schemars::SchemaGenerator) -> schemars::Schema {
145        schemars::json_schema!({
146            "type": "string",
147            "description": "Retain records unless explicitly trimmed (\"infinite\"), or automatically \
148                trim records older than the given duration (e.g. \"7days\", \"1week\").",
149            "examples": ["infinite", "7days", "1week"]
150        })
151    }
152}
153
154#[derive(Debug, Clone, Deserialize, schemars::JsonSchema)]
155#[serde(deny_unknown_fields)]
156pub struct TimestampingSpec {
157    /// Timestamping mode for appends that influences how timestamps are handled.
158    #[serde(default)]
159    pub mode: Option<TimestampingModeSpec>,
160    /// Allow client-specified timestamps to exceed the arrival time.
161    /// If this is `false` or not set, client timestamps will be capped at the arrival time.
162    #[serde(default)]
163    pub uncapped: Option<bool>,
164}
165
166#[derive(Debug, Clone, Deserialize, Serialize)]
167#[serde(rename_all = "kebab-case")]
168pub enum TimestampingModeSpec {
169    ClientPrefer,
170    ClientRequire,
171    Arrival,
172}
173
174impl schemars::JsonSchema for TimestampingModeSpec {
175    fn schema_name() -> Cow<'static, str> {
176        "TimestampingModeSpec".into()
177    }
178
179    fn json_schema(_: &mut schemars::SchemaGenerator) -> schemars::Schema {
180        schemars::json_schema!({
181            "type": "string",
182            "description": "Timestamping mode for appends that influences how timestamps are handled.",
183            "enum": ["client-prefer", "client-require", "arrival"]
184        })
185    }
186}
187
188impl From<TimestampingModeSpec> for TimestampingMode {
189    fn from(m: TimestampingModeSpec) -> Self {
190        match m {
191            TimestampingModeSpec::ClientPrefer => TimestampingMode::ClientPrefer,
192            TimestampingModeSpec::ClientRequire => TimestampingMode::ClientRequire,
193            TimestampingModeSpec::Arrival => TimestampingMode::Arrival,
194        }
195    }
196}
197
198#[derive(Debug, Clone, Deserialize, schemars::JsonSchema)]
199#[serde(deny_unknown_fields)]
200pub struct DeleteOnEmptySpec {
201    /// Minimum age before an empty stream can be deleted.
202    /// Set to 0 (default) to disable delete-on-empty (don't delete automatically).
203    #[serde(default)]
204    pub min_age: Option<HumanDuration>,
205}
206
207/// A `std::time::Duration` deserialized from a humantime string (e.g. `"1d"`, `"2h 30m"`).
208#[derive(Debug, Clone, Copy)]
209pub struct HumanDuration(pub Duration);
210
211impl TryFrom<String> for HumanDuration {
212    type Error = String;
213
214    fn try_from(s: String) -> Result<Self, Self::Error> {
215        humantime::parse_duration(&s)
216            .map(HumanDuration)
217            .map_err(|e| format!("invalid duration {:?}: {}", s, e))
218    }
219}
220
221impl<'de> Deserialize<'de> for HumanDuration {
222    fn deserialize<D: serde::Deserializer<'de>>(d: D) -> Result<Self, D::Error> {
223        let s = String::deserialize(d)?;
224        HumanDuration::try_from(s).map_err(serde::de::Error::custom)
225    }
226}
227
228impl schemars::JsonSchema for HumanDuration {
229    fn schema_name() -> Cow<'static, str> {
230        "HumanDuration".into()
231    }
232
233    fn json_schema(_: &mut schemars::SchemaGenerator) -> schemars::Schema {
234        schemars::json_schema!({
235            "type": "string",
236            "description": "A duration string in humantime format, e.g. \"1day\", \"2h 30m\"",
237            "examples": ["1day", "2h 30m"]
238        })
239    }
240}
241
242impl From<BasinConfigSpec> for BasinReconfiguration {
243    fn from(s: BasinConfigSpec) -> Self {
244        BasinReconfiguration {
245            default_stream_config: s
246                .default_stream_config
247                .map(|dsc| Some(StreamReconfiguration::from(dsc)))
248                .map_or(Maybe::Unspecified, Maybe::Specified),
249            create_stream_on_append: s
250                .create_stream_on_append
251                .map_or(Maybe::Unspecified, Maybe::Specified),
252            create_stream_on_read: s
253                .create_stream_on_read
254                .map_or(Maybe::Unspecified, Maybe::Specified),
255        }
256    }
257}
258
259impl From<StreamConfigSpec> for StreamReconfiguration {
260    fn from(s: StreamConfigSpec) -> Self {
261        StreamReconfiguration {
262            storage_class: s
263                .storage_class
264                .map(|sc| Some(StorageClass::from(sc)))
265                .map_or(Maybe::Unspecified, Maybe::Specified),
266            retention_policy: s
267                .retention_policy
268                .map(|rp| Some(rp.0))
269                .map_or(Maybe::Unspecified, Maybe::Specified),
270            timestamping: s
271                .timestamping
272                .map(|ts| {
273                    Some(TimestampingReconfiguration {
274                        mode: ts
275                            .mode
276                            .map(|m| Some(TimestampingMode::from(m)))
277                            .map_or(Maybe::Unspecified, Maybe::Specified),
278                        uncapped: ts
279                            .uncapped
280                            .map(Some)
281                            .map_or(Maybe::Unspecified, Maybe::Specified),
282                    })
283                })
284                .map_or(Maybe::Unspecified, Maybe::Specified),
285            delete_on_empty: s
286                .delete_on_empty
287                .map(|doe| {
288                    Some(DeleteOnEmptyReconfiguration {
289                        min_age: doe
290                            .min_age
291                            .map(|h| Some(h.0))
292                            .map_or(Maybe::Unspecified, Maybe::Specified),
293                    })
294                })
295                .map_or(Maybe::Unspecified, Maybe::Specified),
296        }
297    }
298}
299
300pub fn json_schema() -> serde_json::Value {
301    serde_json::to_value(schemars::schema_for!(ResourcesSpec)).unwrap()
302}
303
304pub fn validate(spec: &ResourcesSpec) -> eyre::Result<()> {
305    let mut errors = Vec::new();
306    let mut seen_basins = std::collections::HashSet::new();
307
308    for basin_spec in &spec.basins {
309        if !seen_basins.insert(basin_spec.name.clone()) {
310            errors.push(format!("duplicate basin name {:?}", basin_spec.name));
311        }
312
313        if let Err(e) = basin_spec.name.parse::<BasinName>() {
314            errors.push(format!("invalid basin name {:?}: {}", basin_spec.name, e));
315            continue;
316        }
317
318        let mut seen_streams = std::collections::HashSet::new();
319        for stream_spec in &basin_spec.streams {
320            if !seen_streams.insert(stream_spec.name.clone()) {
321                errors.push(format!(
322                    "duplicate stream name {:?} in basin {:?}",
323                    stream_spec.name, basin_spec.name
324                ));
325            }
326            if let Err(e) = stream_spec.name.parse::<StreamName>() {
327                errors.push(format!(
328                    "invalid stream name {:?} in basin {:?}: {}",
329                    stream_spec.name, basin_spec.name, e
330                ));
331            }
332        }
333    }
334
335    if errors.is_empty() {
336        Ok(())
337    } else {
338        Err(eyre::eyre!("{}", errors.join("\n")))
339    }
340}
341
342pub fn load(path: &Path) -> eyre::Result<ResourcesSpec> {
343    let contents = std::fs::read_to_string(path)
344        .map_err(|e| eyre::eyre!("failed to read init file {:?}: {}", path, e))?;
345    let spec: ResourcesSpec = serde_json::from_str(&contents)
346        .map_err(|e| eyre::eyre!("failed to parse init file {:?}: {}", path, e))?;
347    Ok(spec)
348}
349
350pub async fn apply(backend: &Backend, spec: ResourcesSpec) -> eyre::Result<()> {
351    validate(&spec)?;
352
353    for basin_spec in spec.basins {
354        let basin: BasinName = basin_spec
355            .name
356            .parse()
357            .map_err(|e| eyre::eyre!("invalid basin name {:?}: {}", basin_spec.name, e))?;
358
359        let reconfiguration = basin_spec
360            .config
361            .map(BasinReconfiguration::from)
362            .unwrap_or_default();
363
364        backend
365            .create_basin(
366                basin.clone(),
367                reconfiguration,
368                CreateMode::CreateOrReconfigure,
369            )
370            .await
371            .map_err(|e| eyre::eyre!("failed to apply basin {:?}: {}", basin.as_ref(), e))?;
372
373        info!(basin = basin.as_ref(), "basin applied");
374
375        for stream_spec in basin_spec.streams {
376            let stream: StreamName = stream_spec
377                .name
378                .parse()
379                .map_err(|e| eyre::eyre!("invalid stream name {:?}: {}", stream_spec.name, e))?;
380
381            let reconfiguration = stream_spec
382                .config
383                .map(StreamReconfiguration::from)
384                .unwrap_or_default();
385
386            backend
387                .create_stream(
388                    basin.clone(),
389                    stream.clone(),
390                    reconfiguration,
391                    CreateMode::CreateOrReconfigure,
392                )
393                .await
394                .map_err(|e| {
395                    eyre::eyre!(
396                        "failed to apply stream {:?}/{:?}: {}",
397                        basin.as_ref(),
398                        stream.as_ref(),
399                        e
400                    )
401                })?;
402
403            info!(
404                basin = basin.as_ref(),
405                stream = stream.as_ref(),
406                "stream applied"
407            );
408        }
409    }
410    Ok(())
411}
412
413#[cfg(test)]
414mod tests {
415    use super::*;
416
417    fn parse_spec(json: &str) -> ResourcesSpec {
418        serde_json::from_str(json).expect("valid JSON")
419    }
420
421    #[test]
422    fn empty_spec() {
423        let spec = parse_spec("{}");
424        assert!(spec.basins.is_empty());
425    }
426
427    #[test]
428    fn basin_no_config() {
429        let spec = parse_spec(r#"{"basins":[{"name":"my-basin"}]}"#);
430        assert_eq!(spec.basins.len(), 1);
431        assert_eq!(spec.basins[0].name, "my-basin");
432        assert!(spec.basins[0].config.is_none());
433        assert!(spec.basins[0].streams.is_empty());
434    }
435
436    #[test]
437    fn retention_policy_infinite() {
438        let rp: RetentionPolicySpec = serde_json::from_str(r#""infinite""#).expect("deserialize");
439        assert!(matches!(rp.0, RetentionPolicy::Infinite()));
440    }
441
442    #[test]
443    fn retention_policy_duration() {
444        let rp: RetentionPolicySpec = serde_json::from_str(r#""7days""#).expect("deserialize");
445        assert!(matches!(rp.0, RetentionPolicy::Age(_)));
446        if let RetentionPolicy::Age(d) = rp.0 {
447            assert_eq!(d, Duration::from_secs(7 * 24 * 3600));
448        }
449    }
450
451    #[test]
452    fn retention_policy_invalid() {
453        let err = serde_json::from_str::<RetentionPolicySpec>(r#""not-a-duration""#);
454        assert!(err.is_err());
455    }
456
457    #[test]
458    fn human_duration() {
459        let hd: HumanDuration = serde_json::from_str(r#""1day""#).expect("deserialize");
460        assert_eq!(hd.0, Duration::from_secs(86400));
461    }
462
463    #[test]
464    fn full_spec_roundtrip() {
465        let json = r#"
466        {
467          "basins": [
468            {
469              "name": "my-basin",
470              "config": {
471                "create_stream_on_append": true,
472                "create_stream_on_read": false,
473                "default_stream_config": {
474                  "storage_class": "express",
475                  "retention_policy": "7days",
476                  "timestamping": {
477                    "mode": "client-prefer",
478                    "uncapped": false
479                  },
480                  "delete_on_empty": {
481                    "min_age": "1day"
482                  }
483                }
484              },
485              "streams": [
486                {
487                  "name": "events",
488                  "config": {
489                    "storage_class": "standard",
490                    "retention_policy": "infinite"
491                  }
492                }
493              ]
494            }
495          ]
496        }"#;
497
498        let spec = parse_spec(json);
499        assert_eq!(spec.basins.len(), 1);
500        let basin = &spec.basins[0];
501        assert_eq!(basin.name, "my-basin");
502
503        let config = basin.config.as_ref().unwrap();
504        assert_eq!(config.create_stream_on_append, Some(true));
505        assert_eq!(config.create_stream_on_read, Some(false));
506
507        let dsc = config.default_stream_config.as_ref().unwrap();
508        assert!(matches!(dsc.storage_class, Some(StorageClassSpec::Express)));
509        assert!(matches!(
510            dsc.retention_policy.as_ref().map(|r| &r.0),
511            Some(RetentionPolicy::Age(_))
512        ));
513
514        let ts = dsc.timestamping.as_ref().unwrap();
515        assert!(matches!(ts.mode, Some(TimestampingModeSpec::ClientPrefer)));
516        assert_eq!(ts.uncapped, Some(false));
517
518        let doe = dsc.delete_on_empty.as_ref().unwrap();
519        assert_eq!(
520            doe.min_age.as_ref().map(|h| h.0),
521            Some(Duration::from_secs(86400))
522        );
523
524        assert_eq!(basin.streams.len(), 1);
525        let stream = &basin.streams[0];
526        assert_eq!(stream.name, "events");
527        let sc = stream.config.as_ref().unwrap();
528        assert!(matches!(sc.storage_class, Some(StorageClassSpec::Standard)));
529        assert!(matches!(
530            sc.retention_policy.as_ref().map(|r| &r.0),
531            Some(RetentionPolicy::Infinite())
532        ));
533    }
534
535    #[test]
536    fn basin_config_conversion() {
537        let spec = BasinConfigSpec {
538            default_stream_config: None,
539            create_stream_on_append: Some(true),
540            create_stream_on_read: None,
541        };
542        let reconfig = BasinReconfiguration::from(spec);
543        assert!(matches!(
544            reconfig.create_stream_on_append,
545            Maybe::Specified(true)
546        ));
547        assert!(matches!(reconfig.create_stream_on_read, Maybe::Unspecified));
548        assert!(matches!(reconfig.default_stream_config, Maybe::Unspecified));
549    }
550
551    #[test]
552    fn validate_valid_spec() {
553        let spec = parse_spec(
554            r#"{"basins":[{"name":"my-basin","streams":[{"name":"events"},{"name":"logs"}]}]}"#,
555        );
556        assert!(validate(&spec).is_ok());
557    }
558
559    #[test]
560    fn validate_invalid_basin_name() {
561        let spec = parse_spec(r#"{"basins":[{"name":"INVALID_BASIN"}]}"#);
562        let err = validate(&spec).unwrap_err();
563        assert!(err.to_string().contains("invalid basin name"));
564    }
565
566    #[test]
567    fn validate_invalid_stream_name() {
568        let spec = parse_spec(r#"{"basins":[{"name":"my-basin","streams":[{"name":""}]}]}"#);
569        let err = validate(&spec).unwrap_err();
570        assert!(err.to_string().contains("invalid stream name"));
571    }
572
573    #[test]
574    fn validate_duplicate_basin_names() {
575        let spec = parse_spec(r#"{"basins":[{"name":"my-basin"},{"name":"my-basin"}]}"#);
576        let err = validate(&spec).unwrap_err();
577        assert!(err.to_string().contains("duplicate basin name"));
578    }
579
580    #[test]
581    fn validate_duplicate_stream_names() {
582        let spec = parse_spec(
583            r#"{"basins":[{"name":"my-basin","streams":[{"name":"events"},{"name":"events"}]}]}"#,
584        );
585        let err = validate(&spec).unwrap_err();
586        assert!(err.to_string().contains("duplicate stream name"));
587    }
588
589    #[test]
590    fn validate_multiple_errors() {
591        let spec = parse_spec(r#"{"basins":[{"name":"INVALID"},{"name":"INVALID"}]}"#);
592        let err = validate(&spec).unwrap_err();
593        let msg = err.to_string();
594        assert!(msg.contains("invalid basin name"));
595        assert!(msg.contains("duplicate basin name"));
596    }
597
598    #[test]
599    fn json_schema_is_valid() {
600        let schema = json_schema();
601        assert!(schema.is_object());
602        let schema_obj = schema.as_object().unwrap();
603
604        // using the default generated
605        assert_eq!(
606            schema_obj.get("$schema"),
607            Some(&serde_json::Value::String(
608                "https://json-schema.org/draft/2020-12/schema".to_string()
609            ))
610        );
611
612        assert!(
613            schema_obj.contains_key("properties"),
614            "schema should have root properties"
615        );
616
617        assert!(
618            schema_obj.contains_key("$defs"),
619            "schema should have $defs for reusable definitions"
620        );
621
622        let properties = schema_obj.get("properties").unwrap().as_object().unwrap();
623        assert!(
624            properties.contains_key("basins"),
625            "schema should include the `basins` property"
626        );
627    }
628
629    #[test]
630    fn stream_config_conversion() {
631        let spec = StreamConfigSpec {
632            storage_class: Some(StorageClassSpec::Standard),
633            retention_policy: Some(RetentionPolicySpec(RetentionPolicy::Infinite())),
634            timestamping: None,
635            delete_on_empty: None,
636        };
637        let reconfig = StreamReconfiguration::from(spec);
638        assert!(matches!(
639            reconfig.storage_class,
640            Maybe::Specified(Some(StorageClass::Standard))
641        ));
642        assert!(matches!(
643            reconfig.retention_policy,
644            Maybe::Specified(Some(RetentionPolicy::Infinite()))
645        ));
646        assert!(matches!(reconfig.timestamping, Maybe::Unspecified));
647        assert!(matches!(reconfig.delete_on_empty, Maybe::Unspecified));
648    }
649}