1use std::{borrow::Cow, path::Path, time::Duration};
6
7use s2_common::{
8 maybe::Maybe,
9 types::{
10 basin::{BasinName, CreateBasinIntent},
11 config::{
12 BasinReconfiguration, DeleteOnEmptyReconfiguration, RetentionPolicy, StorageClass,
13 StreamReconfiguration, TimestampingMode, TimestampingReconfiguration,
14 },
15 stream::{CreateStreamIntent, StreamName},
16 },
17};
18use serde::{Deserialize, Serialize};
19use tracing::info;
20
21use crate::backend::Backend;
22
23#[derive(Debug, Deserialize, Default, schemars::JsonSchema)]
24pub struct ResourcesSpec {
25 #[serde(default)]
26 pub basins: Vec<BasinSpec>,
27}
28
29#[derive(Debug, Deserialize, schemars::JsonSchema)]
30#[serde(deny_unknown_fields)]
31pub struct BasinSpec {
32 pub name: String,
33 #[serde(default)]
34 pub config: Option<BasinConfigSpec>,
35 #[serde(default)]
36 pub streams: Vec<StreamSpec>,
37}
38
39#[derive(Debug, Deserialize, schemars::JsonSchema)]
40#[serde(deny_unknown_fields)]
41pub struct StreamSpec {
42 pub name: String,
43 #[serde(default)]
44 pub config: Option<StreamConfigSpec>,
45}
46
47#[derive(Debug, Clone, Deserialize, Default, schemars::JsonSchema)]
48#[serde(deny_unknown_fields)]
49pub struct BasinConfigSpec {
50 #[serde(default)]
51 pub default_stream_config: Option<StreamConfigSpec>,
52 #[serde(default)]
54 pub stream_cipher: Option<EncryptionAlgorithmSpec>,
55 #[serde(default)]
57 pub create_stream_on_append: Option<bool>,
58 #[serde(default)]
60 pub create_stream_on_read: Option<bool>,
61}
62
63#[derive(Debug, Clone, Deserialize, Default, schemars::JsonSchema)]
64#[serde(deny_unknown_fields)]
65pub struct StreamConfigSpec {
66 #[serde(default)]
68 pub storage_class: Option<StorageClassSpec>,
69 #[serde(default)]
72 pub retention_policy: Option<RetentionPolicySpec>,
73 #[serde(default)]
75 pub timestamping: Option<TimestampingSpec>,
76 #[serde(default)]
78 pub delete_on_empty: Option<DeleteOnEmptySpec>,
79}
80
81#[derive(Debug, Clone, Deserialize, Serialize)]
82#[serde(rename_all = "kebab-case")]
83pub enum StorageClassSpec {
84 Standard,
85 Express,
86}
87
88impl schemars::JsonSchema for StorageClassSpec {
89 fn schema_name() -> Cow<'static, str> {
90 "StorageClassSpec".into()
91 }
92
93 fn json_schema(_: &mut schemars::SchemaGenerator) -> schemars::Schema {
94 schemars::json_schema!({
95 "type": "string",
96 "description": "Storage class for recent writes.",
97 "enum": ["standard", "express"]
98 })
99 }
100}
101
102impl From<StorageClassSpec> for StorageClass {
103 fn from(s: StorageClassSpec) -> Self {
104 match s {
105 StorageClassSpec::Standard => StorageClass::Standard,
106 StorageClassSpec::Express => StorageClass::Express,
107 }
108 }
109}
110
111#[derive(Debug, Clone, Deserialize, Serialize)]
112pub enum EncryptionAlgorithmSpec {
113 #[serde(rename = "aegis-256")]
114 Aegis256,
115 #[serde(rename = "aes-256-gcm")]
116 Aes256Gcm,
117}
118
119impl schemars::JsonSchema for EncryptionAlgorithmSpec {
120 fn schema_name() -> Cow<'static, str> {
121 "EncryptionAlgorithmSpec".into()
122 }
123
124 fn json_schema(_: &mut schemars::SchemaGenerator) -> schemars::Schema {
125 schemars::json_schema!({
126 "type": "string",
127 "description": "Encryption algorithm to apply to newly created streams in the basin.",
128 "enum": ["aegis-256", "aes-256-gcm"]
129 })
130 }
131}
132
133impl From<EncryptionAlgorithmSpec> for s2_common::encryption::EncryptionAlgorithm {
134 fn from(m: EncryptionAlgorithmSpec) -> Self {
135 match m {
136 EncryptionAlgorithmSpec::Aegis256 => Self::Aegis256,
137 EncryptionAlgorithmSpec::Aes256Gcm => Self::Aes256Gcm,
138 }
139 }
140}
141
142#[derive(Debug, Clone, Copy)]
144pub struct RetentionPolicySpec(pub RetentionPolicy);
145
146impl RetentionPolicySpec {
147 pub fn age_secs(self) -> Option<u64> {
148 self.0.age().map(|d| d.as_secs())
149 }
150}
151
152impl TryFrom<String> for RetentionPolicySpec {
153 type Error = String;
154
155 fn try_from(s: String) -> Result<Self, Self::Error> {
156 if s.eq_ignore_ascii_case("infinite") {
157 return Ok(RetentionPolicySpec(RetentionPolicy::Infinite()));
158 }
159 let d = humantime::parse_duration(&s)
160 .map_err(|e| format!("invalid retention_policy {:?}: {}", s, e))?;
161 Ok(RetentionPolicySpec(RetentionPolicy::Age(d)))
162 }
163}
164
165impl<'de> Deserialize<'de> for RetentionPolicySpec {
166 fn deserialize<D: serde::Deserializer<'de>>(d: D) -> Result<Self, D::Error> {
167 let s = String::deserialize(d)?;
168 RetentionPolicySpec::try_from(s).map_err(serde::de::Error::custom)
169 }
170}
171
172impl schemars::JsonSchema for RetentionPolicySpec {
173 fn schema_name() -> Cow<'static, str> {
174 "RetentionPolicySpec".into()
175 }
176
177 fn json_schema(_: &mut schemars::SchemaGenerator) -> schemars::Schema {
178 schemars::json_schema!({
179 "type": "string",
180 "description": "Retain records unless explicitly trimmed (\"infinite\"), or automatically \
181 trim records older than the given duration (e.g. \"7days\", \"1week\").",
182 "examples": ["infinite", "7days", "1week"]
183 })
184 }
185}
186
187#[derive(Debug, Clone, Deserialize, schemars::JsonSchema)]
188#[serde(deny_unknown_fields)]
189pub struct TimestampingSpec {
190 #[serde(default)]
192 pub mode: Option<TimestampingModeSpec>,
193 #[serde(default)]
196 pub uncapped: Option<bool>,
197}
198
199#[derive(Debug, Clone, Deserialize, Serialize)]
200#[serde(rename_all = "kebab-case")]
201pub enum TimestampingModeSpec {
202 ClientPrefer,
203 ClientRequire,
204 Arrival,
205}
206
207impl schemars::JsonSchema for TimestampingModeSpec {
208 fn schema_name() -> Cow<'static, str> {
209 "TimestampingModeSpec".into()
210 }
211
212 fn json_schema(_: &mut schemars::SchemaGenerator) -> schemars::Schema {
213 schemars::json_schema!({
214 "type": "string",
215 "description": "Timestamping mode for appends that influences how timestamps are handled.",
216 "enum": ["client-prefer", "client-require", "arrival"]
217 })
218 }
219}
220
221impl From<TimestampingModeSpec> for TimestampingMode {
222 fn from(m: TimestampingModeSpec) -> Self {
223 match m {
224 TimestampingModeSpec::ClientPrefer => TimestampingMode::ClientPrefer,
225 TimestampingModeSpec::ClientRequire => TimestampingMode::ClientRequire,
226 TimestampingModeSpec::Arrival => TimestampingMode::Arrival,
227 }
228 }
229}
230
231#[derive(Debug, Clone, Deserialize, schemars::JsonSchema)]
232#[serde(deny_unknown_fields)]
233pub struct DeleteOnEmptySpec {
234 #[serde(default)]
237 pub min_age: Option<HumanDuration>,
238}
239
240#[derive(Debug, Clone, Copy)]
242pub struct HumanDuration(pub Duration);
243
244impl TryFrom<String> for HumanDuration {
245 type Error = String;
246
247 fn try_from(s: String) -> Result<Self, Self::Error> {
248 humantime::parse_duration(&s)
249 .map(HumanDuration)
250 .map_err(|e| format!("invalid duration {:?}: {}", s, e))
251 }
252}
253
254impl<'de> Deserialize<'de> for HumanDuration {
255 fn deserialize<D: serde::Deserializer<'de>>(d: D) -> Result<Self, D::Error> {
256 let s = String::deserialize(d)?;
257 HumanDuration::try_from(s).map_err(serde::de::Error::custom)
258 }
259}
260
261impl schemars::JsonSchema for HumanDuration {
262 fn schema_name() -> Cow<'static, str> {
263 "HumanDuration".into()
264 }
265
266 fn json_schema(_: &mut schemars::SchemaGenerator) -> schemars::Schema {
267 schemars::json_schema!({
268 "type": "string",
269 "description": "A duration string in humantime format, e.g. \"1day\", \"2h 30m\"",
270 "examples": ["1day", "2h 30m"]
271 })
272 }
273}
274
275impl From<BasinConfigSpec> for BasinReconfiguration {
276 fn from(s: BasinConfigSpec) -> Self {
277 BasinReconfiguration {
278 default_stream_config: s
279 .default_stream_config
280 .map(|dsc| Some(StreamReconfiguration::from(dsc)))
281 .map_or(Maybe::Unspecified, Maybe::Specified),
282 stream_cipher: s
283 .stream_cipher
284 .map(|algorithm| Some(algorithm.into()))
285 .map_or(Maybe::Unspecified, Maybe::Specified),
286 create_stream_on_append: s
287 .create_stream_on_append
288 .map_or(Maybe::Unspecified, Maybe::Specified),
289 create_stream_on_read: s
290 .create_stream_on_read
291 .map_or(Maybe::Unspecified, Maybe::Specified),
292 }
293 }
294}
295
296impl From<StreamConfigSpec> for StreamReconfiguration {
297 fn from(s: StreamConfigSpec) -> Self {
298 StreamReconfiguration {
299 storage_class: s
300 .storage_class
301 .map(|sc| Some(StorageClass::from(sc)))
302 .map_or(Maybe::Unspecified, Maybe::Specified),
303 retention_policy: s
304 .retention_policy
305 .map(|rp| Some(rp.0))
306 .map_or(Maybe::Unspecified, Maybe::Specified),
307 timestamping: s
308 .timestamping
309 .map(|ts| {
310 Some(TimestampingReconfiguration {
311 mode: ts
312 .mode
313 .map(|m| Some(TimestampingMode::from(m)))
314 .map_or(Maybe::Unspecified, Maybe::Specified),
315 uncapped: ts
316 .uncapped
317 .map(Some)
318 .map_or(Maybe::Unspecified, Maybe::Specified),
319 })
320 })
321 .map_or(Maybe::Unspecified, Maybe::Specified),
322 delete_on_empty: s
323 .delete_on_empty
324 .map(|doe| {
325 Some(DeleteOnEmptyReconfiguration {
326 min_age: doe
327 .min_age
328 .map(|h| Some(h.0))
329 .map_or(Maybe::Unspecified, Maybe::Specified),
330 })
331 })
332 .map_or(Maybe::Unspecified, Maybe::Specified),
333 }
334 }
335}
336
337pub fn json_schema() -> serde_json::Value {
338 serde_json::to_value(schemars::schema_for!(ResourcesSpec)).unwrap()
339}
340
341pub fn validate(spec: &ResourcesSpec) -> eyre::Result<()> {
342 let mut errors = Vec::new();
343 let mut seen_basins = std::collections::HashSet::new();
344
345 for basin_spec in &spec.basins {
346 if !seen_basins.insert(basin_spec.name.clone()) {
347 errors.push(format!("duplicate basin name {:?}", basin_spec.name));
348 }
349
350 if let Err(e) = basin_spec.name.parse::<BasinName>() {
351 errors.push(format!("invalid basin name {:?}: {}", basin_spec.name, e));
352 continue;
353 }
354
355 let mut seen_streams = std::collections::HashSet::new();
356 for stream_spec in &basin_spec.streams {
357 if !seen_streams.insert(stream_spec.name.clone()) {
358 errors.push(format!(
359 "duplicate stream name {:?} in basin {:?}",
360 stream_spec.name, basin_spec.name
361 ));
362 }
363 if let Err(e) = stream_spec.name.parse::<StreamName>() {
364 errors.push(format!(
365 "invalid stream name {:?} in basin {:?}: {}",
366 stream_spec.name, basin_spec.name, e
367 ));
368 }
369 }
370 }
371
372 if errors.is_empty() {
373 Ok(())
374 } else {
375 Err(eyre::eyre!("{}", errors.join("\n")))
376 }
377}
378
379pub fn load(path: &Path) -> eyre::Result<ResourcesSpec> {
380 let contents = std::fs::read_to_string(path)
381 .map_err(|e| eyre::eyre!("failed to read init file {:?}: {}", path, e))?;
382 let spec: ResourcesSpec = serde_json::from_str(&contents)
383 .map_err(|e| eyre::eyre!("failed to parse init file {:?}: {}", path, e))?;
384 Ok(spec)
385}
386
387pub async fn apply(backend: &Backend, spec: ResourcesSpec) -> eyre::Result<()> {
388 validate(&spec)?;
389
390 for basin_spec in spec.basins {
391 let basin: BasinName = basin_spec
392 .name
393 .parse()
394 .map_err(|e| eyre::eyre!("invalid basin name {:?}: {}", basin_spec.name, e))?;
395
396 let reconfiguration = basin_spec
397 .config
398 .map(BasinReconfiguration::from)
399 .unwrap_or_default();
400
401 backend
402 .create_basin(
403 basin.clone(),
404 CreateBasinIntent::CreateOrReconfigure { reconfiguration },
405 )
406 .await
407 .map_err(|e| eyre::eyre!("failed to apply basin {:?}: {}", basin.as_ref(), e))?;
408
409 info!(basin = basin.as_ref(), "basin applied");
410
411 for stream_spec in basin_spec.streams {
412 let stream: StreamName = stream_spec
413 .name
414 .parse()
415 .map_err(|e| eyre::eyre!("invalid stream name {:?}: {}", stream_spec.name, e))?;
416
417 let reconfiguration = stream_spec
418 .config
419 .map(StreamReconfiguration::from)
420 .unwrap_or_default();
421
422 backend
423 .create_stream(
424 basin.clone(),
425 stream.clone(),
426 CreateStreamIntent::CreateOrReconfigure { reconfiguration },
427 )
428 .await
429 .map_err(|e| {
430 eyre::eyre!(
431 "failed to apply stream {:?}/{:?}: {}",
432 basin.as_ref(),
433 stream.as_ref(),
434 e
435 )
436 })?;
437
438 info!(
439 basin = basin.as_ref(),
440 stream = stream.as_ref(),
441 "stream applied"
442 );
443 }
444 }
445 Ok(())
446}
447
448#[cfg(test)]
449mod tests {
450 use super::*;
451
452 fn parse_spec(json: &str) -> ResourcesSpec {
453 serde_json::from_str(json).expect("valid JSON")
454 }
455
456 #[test]
457 fn empty_spec() {
458 let spec = parse_spec("{}");
459 assert!(spec.basins.is_empty());
460 }
461
462 #[test]
463 fn basin_no_config() {
464 let spec = parse_spec(r#"{"basins":[{"name":"my-basin"}]}"#);
465 assert_eq!(spec.basins.len(), 1);
466 assert_eq!(spec.basins[0].name, "my-basin");
467 assert!(spec.basins[0].config.is_none());
468 assert!(spec.basins[0].streams.is_empty());
469 }
470
471 #[test]
472 fn retention_policy_infinite() {
473 let rp: RetentionPolicySpec = serde_json::from_str(r#""infinite""#).expect("deserialize");
474 assert!(matches!(rp.0, RetentionPolicy::Infinite()));
475 }
476
477 #[test]
478 fn retention_policy_duration() {
479 let rp: RetentionPolicySpec = serde_json::from_str(r#""7days""#).expect("deserialize");
480 assert!(matches!(rp.0, RetentionPolicy::Age(_)));
481 if let RetentionPolicy::Age(d) = rp.0 {
482 assert_eq!(d, Duration::from_secs(7 * 24 * 3600));
483 }
484 }
485
486 #[test]
487 fn retention_policy_invalid() {
488 let err = serde_json::from_str::<RetentionPolicySpec>(r#""not-a-duration""#);
489 assert!(err.is_err());
490 }
491
492 #[test]
493 fn human_duration() {
494 let hd: HumanDuration = serde_json::from_str(r#""1day""#).expect("deserialize");
495 assert_eq!(hd.0, Duration::from_secs(86400));
496 }
497
498 #[test]
499 fn full_spec_roundtrip() {
500 let json = r#"
501 {
502 "basins": [
503 {
504 "name": "my-basin",
505 "config": {
506 "create_stream_on_append": true,
507 "create_stream_on_read": false,
508 "default_stream_config": {
509 "storage_class": "express",
510 "retention_policy": "7days",
511 "timestamping": {
512 "mode": "client-prefer",
513 "uncapped": false
514 },
515 "delete_on_empty": {
516 "min_age": "1day"
517 }
518 }
519 },
520 "streams": [
521 {
522 "name": "events",
523 "config": {
524 "storage_class": "standard",
525 "retention_policy": "infinite"
526 }
527 }
528 ]
529 }
530 ]
531 }"#;
532
533 let spec = parse_spec(json);
534 assert_eq!(spec.basins.len(), 1);
535 let basin = &spec.basins[0];
536 assert_eq!(basin.name, "my-basin");
537
538 let config = basin.config.as_ref().unwrap();
539 assert_eq!(config.create_stream_on_append, Some(true));
540 assert_eq!(config.create_stream_on_read, Some(false));
541
542 let dsc = config.default_stream_config.as_ref().unwrap();
543 assert!(matches!(dsc.storage_class, Some(StorageClassSpec::Express)));
544 assert!(matches!(
545 dsc.retention_policy.as_ref().map(|r| &r.0),
546 Some(RetentionPolicy::Age(_))
547 ));
548
549 let ts = dsc.timestamping.as_ref().unwrap();
550 assert!(matches!(ts.mode, Some(TimestampingModeSpec::ClientPrefer)));
551 assert_eq!(ts.uncapped, Some(false));
552
553 let doe = dsc.delete_on_empty.as_ref().unwrap();
554 assert_eq!(
555 doe.min_age.as_ref().map(|h| h.0),
556 Some(Duration::from_secs(86400))
557 );
558
559 assert_eq!(basin.streams.len(), 1);
560 let stream = &basin.streams[0];
561 assert_eq!(stream.name, "events");
562 let sc = stream.config.as_ref().unwrap();
563 assert!(matches!(sc.storage_class, Some(StorageClassSpec::Standard)));
564 assert!(matches!(
565 sc.retention_policy.as_ref().map(|r| &r.0),
566 Some(RetentionPolicy::Infinite())
567 ));
568 }
569
570 #[test]
571 fn basin_config_conversion() {
572 let spec = BasinConfigSpec {
573 default_stream_config: None,
574 stream_cipher: None,
575 create_stream_on_append: Some(true),
576 create_stream_on_read: None,
577 };
578 let reconfig = BasinReconfiguration::from(spec);
579 assert!(matches!(
580 reconfig.create_stream_on_append,
581 Maybe::Specified(true)
582 ));
583 assert!(matches!(reconfig.create_stream_on_read, Maybe::Unspecified));
584 assert!(matches!(reconfig.default_stream_config, Maybe::Unspecified));
585 }
586
587 #[test]
588 fn validate_valid_spec() {
589 let spec = parse_spec(
590 r#"{"basins":[{"name":"my-basin","streams":[{"name":"events"},{"name":"logs"}]}]}"#,
591 );
592 assert!(validate(&spec).is_ok());
593 }
594
595 #[test]
596 fn validate_invalid_basin_name() {
597 let spec = parse_spec(r#"{"basins":[{"name":"INVALID_BASIN"}]}"#);
598 let err = validate(&spec).unwrap_err();
599 assert!(err.to_string().contains("invalid basin name"));
600 }
601
602 #[test]
603 fn validate_invalid_stream_name() {
604 let spec = parse_spec(r#"{"basins":[{"name":"my-basin","streams":[{"name":""}]}]}"#);
605 let err = validate(&spec).unwrap_err();
606 assert!(err.to_string().contains("invalid stream name"));
607 }
608
609 #[test]
610 fn validate_duplicate_basin_names() {
611 let spec = parse_spec(r#"{"basins":[{"name":"my-basin"},{"name":"my-basin"}]}"#);
612 let err = validate(&spec).unwrap_err();
613 assert!(err.to_string().contains("duplicate basin name"));
614 }
615
616 #[test]
617 fn validate_duplicate_stream_names() {
618 let spec = parse_spec(
619 r#"{"basins":[{"name":"my-basin","streams":[{"name":"events"},{"name":"events"}]}]}"#,
620 );
621 let err = validate(&spec).unwrap_err();
622 assert!(err.to_string().contains("duplicate stream name"));
623 }
624
625 #[test]
626 fn validate_multiple_errors() {
627 let spec = parse_spec(r#"{"basins":[{"name":"INVALID"},{"name":"INVALID"}]}"#);
628 let err = validate(&spec).unwrap_err();
629 let msg = err.to_string();
630 assert!(msg.contains("invalid basin name"));
631 assert!(msg.contains("duplicate basin name"));
632 }
633
634 #[test]
635 fn json_schema_is_valid() {
636 let schema = json_schema();
637 assert!(schema.is_object());
638 let schema_obj = schema.as_object().unwrap();
639
640 assert_eq!(
642 schema_obj.get("$schema"),
643 Some(&serde_json::Value::String(
644 "https://json-schema.org/draft/2020-12/schema".to_string()
645 ))
646 );
647
648 assert!(
649 schema_obj.contains_key("properties"),
650 "schema should have root properties"
651 );
652
653 assert!(
654 schema_obj.contains_key("$defs"),
655 "schema should have $defs for reusable definitions"
656 );
657
658 let properties = schema_obj.get("properties").unwrap().as_object().unwrap();
659 assert!(
660 properties.contains_key("basins"),
661 "schema should include the `basins` property"
662 );
663 }
664
665 #[test]
666 fn stream_config_conversion() {
667 let spec = StreamConfigSpec {
668 storage_class: Some(StorageClassSpec::Standard),
669 retention_policy: Some(RetentionPolicySpec(RetentionPolicy::Infinite())),
670 timestamping: None,
671 delete_on_empty: None,
672 };
673 let reconfig = StreamReconfiguration::from(spec);
674 assert!(matches!(
675 reconfig.storage_class,
676 Maybe::Specified(Some(StorageClass::Standard))
677 ));
678 assert!(matches!(
679 reconfig.retention_policy,
680 Maybe::Specified(Some(RetentionPolicy::Infinite()))
681 ));
682 assert!(matches!(reconfig.timestamping, Maybe::Unspecified));
683 assert!(matches!(reconfig.delete_on_empty, Maybe::Unspecified));
684 }
685}