1use std::{borrow::Cow, path::Path, time::Duration};
6
7use s2_common::{
8 maybe::Maybe,
9 types::{
10 basin::BasinName,
11 config::{
12 BasinReconfiguration, DeleteOnEmptyReconfiguration, RetentionPolicy, StorageClass,
13 StreamReconfiguration, TimestampingMode, TimestampingReconfiguration,
14 },
15 resources::CreateMode,
16 stream::StreamName,
17 },
18};
19use serde::{Deserialize, Serialize};
20use tracing::info;
21
22use crate::backend::Backend;
23
24#[derive(Debug, Deserialize, Default, schemars::JsonSchema)]
25pub struct ResourcesSpec {
26 #[serde(default)]
27 pub basins: Vec<BasinSpec>,
28}
29
30#[derive(Debug, Deserialize, schemars::JsonSchema)]
31#[serde(deny_unknown_fields)]
32pub struct BasinSpec {
33 pub name: String,
34 #[serde(default)]
35 pub config: Option<BasinConfigSpec>,
36 #[serde(default)]
37 pub streams: Vec<StreamSpec>,
38}
39
40#[derive(Debug, Deserialize, schemars::JsonSchema)]
41#[serde(deny_unknown_fields)]
42pub struct StreamSpec {
43 pub name: String,
44 #[serde(default)]
45 pub config: Option<StreamConfigSpec>,
46}
47
48#[derive(Debug, Clone, Deserialize, Default, schemars::JsonSchema)]
49#[serde(deny_unknown_fields)]
50pub struct BasinConfigSpec {
51 #[serde(default)]
52 pub default_stream_config: Option<StreamConfigSpec>,
53 #[serde(default)]
55 pub stream_cipher: Option<EncryptionAlgorithmSpec>,
56 #[serde(default)]
58 pub create_stream_on_append: Option<bool>,
59 #[serde(default)]
61 pub create_stream_on_read: Option<bool>,
62}
63
64#[derive(Debug, Clone, Deserialize, Default, schemars::JsonSchema)]
65#[serde(deny_unknown_fields)]
66pub struct StreamConfigSpec {
67 #[serde(default)]
69 pub storage_class: Option<StorageClassSpec>,
70 #[serde(default)]
73 pub retention_policy: Option<RetentionPolicySpec>,
74 #[serde(default)]
76 pub timestamping: Option<TimestampingSpec>,
77 #[serde(default)]
79 pub delete_on_empty: Option<DeleteOnEmptySpec>,
80}
81
82#[derive(Debug, Clone, Deserialize, Serialize)]
83#[serde(rename_all = "kebab-case")]
84pub enum StorageClassSpec {
85 Standard,
86 Express,
87}
88
89impl schemars::JsonSchema for StorageClassSpec {
90 fn schema_name() -> Cow<'static, str> {
91 "StorageClassSpec".into()
92 }
93
94 fn json_schema(_: &mut schemars::SchemaGenerator) -> schemars::Schema {
95 schemars::json_schema!({
96 "type": "string",
97 "description": "Storage class for recent writes.",
98 "enum": ["standard", "express"]
99 })
100 }
101}
102
103impl From<StorageClassSpec> for StorageClass {
104 fn from(s: StorageClassSpec) -> Self {
105 match s {
106 StorageClassSpec::Standard => StorageClass::Standard,
107 StorageClassSpec::Express => StorageClass::Express,
108 }
109 }
110}
111
112#[derive(Debug, Clone, Deserialize, Serialize)]
113pub enum EncryptionAlgorithmSpec {
114 #[serde(rename = "aegis-256")]
115 Aegis256,
116 #[serde(rename = "aes-256-gcm")]
117 Aes256Gcm,
118}
119
120impl schemars::JsonSchema for EncryptionAlgorithmSpec {
121 fn schema_name() -> Cow<'static, str> {
122 "EncryptionAlgorithmSpec".into()
123 }
124
125 fn json_schema(_: &mut schemars::SchemaGenerator) -> schemars::Schema {
126 schemars::json_schema!({
127 "type": "string",
128 "description": "Encryption algorithm to apply to newly created streams in the basin.",
129 "enum": ["aegis-256", "aes-256-gcm"]
130 })
131 }
132}
133
134impl From<EncryptionAlgorithmSpec> for s2_common::encryption::EncryptionAlgorithm {
135 fn from(m: EncryptionAlgorithmSpec) -> Self {
136 match m {
137 EncryptionAlgorithmSpec::Aegis256 => Self::Aegis256,
138 EncryptionAlgorithmSpec::Aes256Gcm => Self::Aes256Gcm,
139 }
140 }
141}
142
143#[derive(Debug, Clone, Copy)]
145pub struct RetentionPolicySpec(pub RetentionPolicy);
146
147impl RetentionPolicySpec {
148 pub fn age_secs(self) -> Option<u64> {
149 self.0.age().map(|d| d.as_secs())
150 }
151}
152
153impl TryFrom<String> for RetentionPolicySpec {
154 type Error = String;
155
156 fn try_from(s: String) -> Result<Self, Self::Error> {
157 if s.eq_ignore_ascii_case("infinite") {
158 return Ok(RetentionPolicySpec(RetentionPolicy::Infinite()));
159 }
160 let d = humantime::parse_duration(&s)
161 .map_err(|e| format!("invalid retention_policy {:?}: {}", s, e))?;
162 Ok(RetentionPolicySpec(RetentionPolicy::Age(d)))
163 }
164}
165
166impl<'de> Deserialize<'de> for RetentionPolicySpec {
167 fn deserialize<D: serde::Deserializer<'de>>(d: D) -> Result<Self, D::Error> {
168 let s = String::deserialize(d)?;
169 RetentionPolicySpec::try_from(s).map_err(serde::de::Error::custom)
170 }
171}
172
173impl schemars::JsonSchema for RetentionPolicySpec {
174 fn schema_name() -> Cow<'static, str> {
175 "RetentionPolicySpec".into()
176 }
177
178 fn json_schema(_: &mut schemars::SchemaGenerator) -> schemars::Schema {
179 schemars::json_schema!({
180 "type": "string",
181 "description": "Retain records unless explicitly trimmed (\"infinite\"), or automatically \
182 trim records older than the given duration (e.g. \"7days\", \"1week\").",
183 "examples": ["infinite", "7days", "1week"]
184 })
185 }
186}
187
188#[derive(Debug, Clone, Deserialize, schemars::JsonSchema)]
189#[serde(deny_unknown_fields)]
190pub struct TimestampingSpec {
191 #[serde(default)]
193 pub mode: Option<TimestampingModeSpec>,
194 #[serde(default)]
197 pub uncapped: Option<bool>,
198}
199
200#[derive(Debug, Clone, Deserialize, Serialize)]
201#[serde(rename_all = "kebab-case")]
202pub enum TimestampingModeSpec {
203 ClientPrefer,
204 ClientRequire,
205 Arrival,
206}
207
208impl schemars::JsonSchema for TimestampingModeSpec {
209 fn schema_name() -> Cow<'static, str> {
210 "TimestampingModeSpec".into()
211 }
212
213 fn json_schema(_: &mut schemars::SchemaGenerator) -> schemars::Schema {
214 schemars::json_schema!({
215 "type": "string",
216 "description": "Timestamping mode for appends that influences how timestamps are handled.",
217 "enum": ["client-prefer", "client-require", "arrival"]
218 })
219 }
220}
221
222impl From<TimestampingModeSpec> for TimestampingMode {
223 fn from(m: TimestampingModeSpec) -> Self {
224 match m {
225 TimestampingModeSpec::ClientPrefer => TimestampingMode::ClientPrefer,
226 TimestampingModeSpec::ClientRequire => TimestampingMode::ClientRequire,
227 TimestampingModeSpec::Arrival => TimestampingMode::Arrival,
228 }
229 }
230}
231
232#[derive(Debug, Clone, Deserialize, schemars::JsonSchema)]
233#[serde(deny_unknown_fields)]
234pub struct DeleteOnEmptySpec {
235 #[serde(default)]
238 pub min_age: Option<HumanDuration>,
239}
240
241#[derive(Debug, Clone, Copy)]
243pub struct HumanDuration(pub Duration);
244
245impl TryFrom<String> for HumanDuration {
246 type Error = String;
247
248 fn try_from(s: String) -> Result<Self, Self::Error> {
249 humantime::parse_duration(&s)
250 .map(HumanDuration)
251 .map_err(|e| format!("invalid duration {:?}: {}", s, e))
252 }
253}
254
255impl<'de> Deserialize<'de> for HumanDuration {
256 fn deserialize<D: serde::Deserializer<'de>>(d: D) -> Result<Self, D::Error> {
257 let s = String::deserialize(d)?;
258 HumanDuration::try_from(s).map_err(serde::de::Error::custom)
259 }
260}
261
262impl schemars::JsonSchema for HumanDuration {
263 fn schema_name() -> Cow<'static, str> {
264 "HumanDuration".into()
265 }
266
267 fn json_schema(_: &mut schemars::SchemaGenerator) -> schemars::Schema {
268 schemars::json_schema!({
269 "type": "string",
270 "description": "A duration string in humantime format, e.g. \"1day\", \"2h 30m\"",
271 "examples": ["1day", "2h 30m"]
272 })
273 }
274}
275
276impl From<BasinConfigSpec> for BasinReconfiguration {
277 fn from(s: BasinConfigSpec) -> Self {
278 BasinReconfiguration {
279 default_stream_config: s
280 .default_stream_config
281 .map(|dsc| Some(StreamReconfiguration::from(dsc)))
282 .map_or(Maybe::Unspecified, Maybe::Specified),
283 stream_cipher: s
284 .stream_cipher
285 .map(|algorithm| Some(algorithm.into()))
286 .map_or(Maybe::Unspecified, Maybe::Specified),
287 create_stream_on_append: s
288 .create_stream_on_append
289 .map_or(Maybe::Unspecified, Maybe::Specified),
290 create_stream_on_read: s
291 .create_stream_on_read
292 .map_or(Maybe::Unspecified, Maybe::Specified),
293 }
294 }
295}
296
297impl From<StreamConfigSpec> for StreamReconfiguration {
298 fn from(s: StreamConfigSpec) -> Self {
299 StreamReconfiguration {
300 storage_class: s
301 .storage_class
302 .map(|sc| Some(StorageClass::from(sc)))
303 .map_or(Maybe::Unspecified, Maybe::Specified),
304 retention_policy: s
305 .retention_policy
306 .map(|rp| Some(rp.0))
307 .map_or(Maybe::Unspecified, Maybe::Specified),
308 timestamping: s
309 .timestamping
310 .map(|ts| {
311 Some(TimestampingReconfiguration {
312 mode: ts
313 .mode
314 .map(|m| Some(TimestampingMode::from(m)))
315 .map_or(Maybe::Unspecified, Maybe::Specified),
316 uncapped: ts
317 .uncapped
318 .map(Some)
319 .map_or(Maybe::Unspecified, Maybe::Specified),
320 })
321 })
322 .map_or(Maybe::Unspecified, Maybe::Specified),
323 delete_on_empty: s
324 .delete_on_empty
325 .map(|doe| {
326 Some(DeleteOnEmptyReconfiguration {
327 min_age: doe
328 .min_age
329 .map(|h| Some(h.0))
330 .map_or(Maybe::Unspecified, Maybe::Specified),
331 })
332 })
333 .map_or(Maybe::Unspecified, Maybe::Specified),
334 }
335 }
336}
337
338pub fn json_schema() -> serde_json::Value {
339 serde_json::to_value(schemars::schema_for!(ResourcesSpec)).unwrap()
340}
341
342pub fn validate(spec: &ResourcesSpec) -> eyre::Result<()> {
343 let mut errors = Vec::new();
344 let mut seen_basins = std::collections::HashSet::new();
345
346 for basin_spec in &spec.basins {
347 if !seen_basins.insert(basin_spec.name.clone()) {
348 errors.push(format!("duplicate basin name {:?}", basin_spec.name));
349 }
350
351 if let Err(e) = basin_spec.name.parse::<BasinName>() {
352 errors.push(format!("invalid basin name {:?}: {}", basin_spec.name, e));
353 continue;
354 }
355
356 let mut seen_streams = std::collections::HashSet::new();
357 for stream_spec in &basin_spec.streams {
358 if !seen_streams.insert(stream_spec.name.clone()) {
359 errors.push(format!(
360 "duplicate stream name {:?} in basin {:?}",
361 stream_spec.name, basin_spec.name
362 ));
363 }
364 if let Err(e) = stream_spec.name.parse::<StreamName>() {
365 errors.push(format!(
366 "invalid stream name {:?} in basin {:?}: {}",
367 stream_spec.name, basin_spec.name, e
368 ));
369 }
370 }
371 }
372
373 if errors.is_empty() {
374 Ok(())
375 } else {
376 Err(eyre::eyre!("{}", errors.join("\n")))
377 }
378}
379
380pub fn load(path: &Path) -> eyre::Result<ResourcesSpec> {
381 let contents = std::fs::read_to_string(path)
382 .map_err(|e| eyre::eyre!("failed to read init file {:?}: {}", path, e))?;
383 let spec: ResourcesSpec = serde_json::from_str(&contents)
384 .map_err(|e| eyre::eyre!("failed to parse init file {:?}: {}", path, e))?;
385 Ok(spec)
386}
387
388pub async fn apply(backend: &Backend, spec: ResourcesSpec) -> eyre::Result<()> {
389 validate(&spec)?;
390
391 for basin_spec in spec.basins {
392 let basin: BasinName = basin_spec
393 .name
394 .parse()
395 .map_err(|e| eyre::eyre!("invalid basin name {:?}: {}", basin_spec.name, e))?;
396
397 let reconfiguration = basin_spec
398 .config
399 .map(BasinReconfiguration::from)
400 .unwrap_or_default();
401
402 backend
403 .create_basin(
404 basin.clone(),
405 reconfiguration,
406 CreateMode::CreateOrReconfigure,
407 )
408 .await
409 .map_err(|e| eyre::eyre!("failed to apply basin {:?}: {}", basin.as_ref(), e))?;
410
411 info!(basin = basin.as_ref(), "basin applied");
412
413 for stream_spec in basin_spec.streams {
414 let stream: StreamName = stream_spec
415 .name
416 .parse()
417 .map_err(|e| eyre::eyre!("invalid stream name {:?}: {}", stream_spec.name, e))?;
418
419 let reconfiguration = stream_spec
420 .config
421 .map(StreamReconfiguration::from)
422 .unwrap_or_default();
423
424 backend
425 .create_stream(
426 basin.clone(),
427 stream.clone(),
428 reconfiguration,
429 CreateMode::CreateOrReconfigure,
430 )
431 .await
432 .map_err(|e| {
433 eyre::eyre!(
434 "failed to apply stream {:?}/{:?}: {}",
435 basin.as_ref(),
436 stream.as_ref(),
437 e
438 )
439 })?;
440
441 info!(
442 basin = basin.as_ref(),
443 stream = stream.as_ref(),
444 "stream applied"
445 );
446 }
447 }
448 Ok(())
449}
450
451#[cfg(test)]
452mod tests {
453 use super::*;
454
455 fn parse_spec(json: &str) -> ResourcesSpec {
456 serde_json::from_str(json).expect("valid JSON")
457 }
458
459 #[test]
460 fn empty_spec() {
461 let spec = parse_spec("{}");
462 assert!(spec.basins.is_empty());
463 }
464
465 #[test]
466 fn basin_no_config() {
467 let spec = parse_spec(r#"{"basins":[{"name":"my-basin"}]}"#);
468 assert_eq!(spec.basins.len(), 1);
469 assert_eq!(spec.basins[0].name, "my-basin");
470 assert!(spec.basins[0].config.is_none());
471 assert!(spec.basins[0].streams.is_empty());
472 }
473
474 #[test]
475 fn retention_policy_infinite() {
476 let rp: RetentionPolicySpec = serde_json::from_str(r#""infinite""#).expect("deserialize");
477 assert!(matches!(rp.0, RetentionPolicy::Infinite()));
478 }
479
480 #[test]
481 fn retention_policy_duration() {
482 let rp: RetentionPolicySpec = serde_json::from_str(r#""7days""#).expect("deserialize");
483 assert!(matches!(rp.0, RetentionPolicy::Age(_)));
484 if let RetentionPolicy::Age(d) = rp.0 {
485 assert_eq!(d, Duration::from_secs(7 * 24 * 3600));
486 }
487 }
488
489 #[test]
490 fn retention_policy_invalid() {
491 let err = serde_json::from_str::<RetentionPolicySpec>(r#""not-a-duration""#);
492 assert!(err.is_err());
493 }
494
495 #[test]
496 fn human_duration() {
497 let hd: HumanDuration = serde_json::from_str(r#""1day""#).expect("deserialize");
498 assert_eq!(hd.0, Duration::from_secs(86400));
499 }
500
501 #[test]
502 fn full_spec_roundtrip() {
503 let json = r#"
504 {
505 "basins": [
506 {
507 "name": "my-basin",
508 "config": {
509 "create_stream_on_append": true,
510 "create_stream_on_read": false,
511 "default_stream_config": {
512 "storage_class": "express",
513 "retention_policy": "7days",
514 "timestamping": {
515 "mode": "client-prefer",
516 "uncapped": false
517 },
518 "delete_on_empty": {
519 "min_age": "1day"
520 }
521 }
522 },
523 "streams": [
524 {
525 "name": "events",
526 "config": {
527 "storage_class": "standard",
528 "retention_policy": "infinite"
529 }
530 }
531 ]
532 }
533 ]
534 }"#;
535
536 let spec = parse_spec(json);
537 assert_eq!(spec.basins.len(), 1);
538 let basin = &spec.basins[0];
539 assert_eq!(basin.name, "my-basin");
540
541 let config = basin.config.as_ref().unwrap();
542 assert_eq!(config.create_stream_on_append, Some(true));
543 assert_eq!(config.create_stream_on_read, Some(false));
544
545 let dsc = config.default_stream_config.as_ref().unwrap();
546 assert!(matches!(dsc.storage_class, Some(StorageClassSpec::Express)));
547 assert!(matches!(
548 dsc.retention_policy.as_ref().map(|r| &r.0),
549 Some(RetentionPolicy::Age(_))
550 ));
551
552 let ts = dsc.timestamping.as_ref().unwrap();
553 assert!(matches!(ts.mode, Some(TimestampingModeSpec::ClientPrefer)));
554 assert_eq!(ts.uncapped, Some(false));
555
556 let doe = dsc.delete_on_empty.as_ref().unwrap();
557 assert_eq!(
558 doe.min_age.as_ref().map(|h| h.0),
559 Some(Duration::from_secs(86400))
560 );
561
562 assert_eq!(basin.streams.len(), 1);
563 let stream = &basin.streams[0];
564 assert_eq!(stream.name, "events");
565 let sc = stream.config.as_ref().unwrap();
566 assert!(matches!(sc.storage_class, Some(StorageClassSpec::Standard)));
567 assert!(matches!(
568 sc.retention_policy.as_ref().map(|r| &r.0),
569 Some(RetentionPolicy::Infinite())
570 ));
571 }
572
573 #[test]
574 fn basin_config_conversion() {
575 let spec = BasinConfigSpec {
576 default_stream_config: None,
577 stream_cipher: None,
578 create_stream_on_append: Some(true),
579 create_stream_on_read: None,
580 };
581 let reconfig = BasinReconfiguration::from(spec);
582 assert!(matches!(
583 reconfig.create_stream_on_append,
584 Maybe::Specified(true)
585 ));
586 assert!(matches!(reconfig.create_stream_on_read, Maybe::Unspecified));
587 assert!(matches!(reconfig.default_stream_config, Maybe::Unspecified));
588 }
589
590 #[test]
591 fn validate_valid_spec() {
592 let spec = parse_spec(
593 r#"{"basins":[{"name":"my-basin","streams":[{"name":"events"},{"name":"logs"}]}]}"#,
594 );
595 assert!(validate(&spec).is_ok());
596 }
597
598 #[test]
599 fn validate_invalid_basin_name() {
600 let spec = parse_spec(r#"{"basins":[{"name":"INVALID_BASIN"}]}"#);
601 let err = validate(&spec).unwrap_err();
602 assert!(err.to_string().contains("invalid basin name"));
603 }
604
605 #[test]
606 fn validate_invalid_stream_name() {
607 let spec = parse_spec(r#"{"basins":[{"name":"my-basin","streams":[{"name":""}]}]}"#);
608 let err = validate(&spec).unwrap_err();
609 assert!(err.to_string().contains("invalid stream name"));
610 }
611
612 #[test]
613 fn validate_duplicate_basin_names() {
614 let spec = parse_spec(r#"{"basins":[{"name":"my-basin"},{"name":"my-basin"}]}"#);
615 let err = validate(&spec).unwrap_err();
616 assert!(err.to_string().contains("duplicate basin name"));
617 }
618
619 #[test]
620 fn validate_duplicate_stream_names() {
621 let spec = parse_spec(
622 r#"{"basins":[{"name":"my-basin","streams":[{"name":"events"},{"name":"events"}]}]}"#,
623 );
624 let err = validate(&spec).unwrap_err();
625 assert!(err.to_string().contains("duplicate stream name"));
626 }
627
628 #[test]
629 fn validate_multiple_errors() {
630 let spec = parse_spec(r#"{"basins":[{"name":"INVALID"},{"name":"INVALID"}]}"#);
631 let err = validate(&spec).unwrap_err();
632 let msg = err.to_string();
633 assert!(msg.contains("invalid basin name"));
634 assert!(msg.contains("duplicate basin name"));
635 }
636
637 #[test]
638 fn json_schema_is_valid() {
639 let schema = json_schema();
640 assert!(schema.is_object());
641 let schema_obj = schema.as_object().unwrap();
642
643 assert_eq!(
645 schema_obj.get("$schema"),
646 Some(&serde_json::Value::String(
647 "https://json-schema.org/draft/2020-12/schema".to_string()
648 ))
649 );
650
651 assert!(
652 schema_obj.contains_key("properties"),
653 "schema should have root properties"
654 );
655
656 assert!(
657 schema_obj.contains_key("$defs"),
658 "schema should have $defs for reusable definitions"
659 );
660
661 let properties = schema_obj.get("properties").unwrap().as_object().unwrap();
662 assert!(
663 properties.contains_key("basins"),
664 "schema should include the `basins` property"
665 );
666 }
667
668 #[test]
669 fn stream_config_conversion() {
670 let spec = StreamConfigSpec {
671 storage_class: Some(StorageClassSpec::Standard),
672 retention_policy: Some(RetentionPolicySpec(RetentionPolicy::Infinite())),
673 timestamping: None,
674 delete_on_empty: None,
675 };
676 let reconfig = StreamReconfiguration::from(spec);
677 assert!(matches!(
678 reconfig.storage_class,
679 Maybe::Specified(Some(StorageClass::Standard))
680 ));
681 assert!(matches!(
682 reconfig.retention_policy,
683 Maybe::Specified(Some(RetentionPolicy::Infinite()))
684 ));
685 assert!(matches!(reconfig.timestamping, Maybe::Unspecified));
686 assert!(matches!(reconfig.delete_on_empty, Maybe::Unspecified));
687 }
688}