1use std::{borrow::Cow, path::Path, time::Duration};
6
7use s2_common::types::{
8 basin::BasinName,
9 config::{
10 BasinConfig, OptionalDeleteOnEmptyConfig, OptionalStreamConfig, OptionalTimestampingConfig,
11 RetentionPolicy, StorageClass, TimestampingMode,
12 },
13 resources::ProvisionMode,
14 stream::StreamName,
15};
16use serde::{Deserialize, Serialize};
17use tracing::info;
18
19use crate::backend::Backend;
20
21#[derive(Debug, Deserialize, Default, schemars::JsonSchema)]
22pub struct ResourcesSpec {
23 #[serde(default)]
24 pub basins: Vec<BasinSpec>,
25}
26
27#[derive(Debug, Deserialize, schemars::JsonSchema)]
28#[serde(deny_unknown_fields)]
29pub struct BasinSpec {
30 pub name: String,
31 #[serde(default)]
32 pub config: Option<BasinConfigSpec>,
33 #[serde(default)]
34 pub streams: Vec<StreamSpec>,
35}
36
37#[derive(Debug, Deserialize, schemars::JsonSchema)]
38#[serde(deny_unknown_fields)]
39pub struct StreamSpec {
40 pub name: String,
41 #[serde(default)]
42 pub config: Option<StreamConfigSpec>,
43}
44
45#[derive(Debug, Clone, Deserialize, Default, schemars::JsonSchema)]
46#[serde(deny_unknown_fields)]
47pub struct BasinConfigSpec {
48 #[serde(default)]
49 pub default_stream_config: Option<StreamConfigSpec>,
50 #[serde(default)]
52 pub stream_cipher: Option<EncryptionAlgorithmSpec>,
53 #[serde(default)]
55 pub create_stream_on_append: Option<bool>,
56 #[serde(default)]
58 pub create_stream_on_read: Option<bool>,
59}
60
61#[derive(Debug, Clone, Deserialize, Default, schemars::JsonSchema)]
62#[serde(deny_unknown_fields)]
63pub struct StreamConfigSpec {
64 #[serde(default)]
66 pub storage_class: Option<StorageClassSpec>,
67 #[serde(default)]
70 pub retention_policy: Option<RetentionPolicySpec>,
71 #[serde(default)]
73 pub timestamping: Option<TimestampingSpec>,
74 #[serde(default)]
76 pub delete_on_empty: Option<DeleteOnEmptySpec>,
77}
78
79#[derive(Debug, Clone, Deserialize, Serialize)]
80#[serde(rename_all = "kebab-case")]
81pub enum StorageClassSpec {
82 Standard,
83 Express,
84}
85
86impl schemars::JsonSchema for StorageClassSpec {
87 fn schema_name() -> Cow<'static, str> {
88 "StorageClassSpec".into()
89 }
90
91 fn json_schema(_: &mut schemars::SchemaGenerator) -> schemars::Schema {
92 schemars::json_schema!({
93 "type": "string",
94 "description": "Storage class for recent writes.",
95 "enum": ["standard", "express"]
96 })
97 }
98}
99
100impl From<StorageClassSpec> for StorageClass {
101 fn from(s: StorageClassSpec) -> Self {
102 match s {
103 StorageClassSpec::Standard => StorageClass::Standard,
104 StorageClassSpec::Express => StorageClass::Express,
105 }
106 }
107}
108
109#[derive(Debug, Clone, Deserialize, Serialize)]
110pub enum EncryptionAlgorithmSpec {
111 #[serde(rename = "aegis-256")]
112 Aegis256,
113 #[serde(rename = "aes-256-gcm")]
114 Aes256Gcm,
115}
116
117impl schemars::JsonSchema for EncryptionAlgorithmSpec {
118 fn schema_name() -> Cow<'static, str> {
119 "EncryptionAlgorithmSpec".into()
120 }
121
122 fn json_schema(_: &mut schemars::SchemaGenerator) -> schemars::Schema {
123 schemars::json_schema!({
124 "type": "string",
125 "description": "Encryption algorithm to apply to newly created streams in the basin.",
126 "enum": ["aegis-256", "aes-256-gcm"]
127 })
128 }
129}
130
131impl From<EncryptionAlgorithmSpec> for s2_common::encryption::EncryptionAlgorithm {
132 fn from(m: EncryptionAlgorithmSpec) -> Self {
133 match m {
134 EncryptionAlgorithmSpec::Aegis256 => Self::Aegis256,
135 EncryptionAlgorithmSpec::Aes256Gcm => Self::Aes256Gcm,
136 }
137 }
138}
139
140#[derive(Debug, Clone, Copy)]
142pub struct RetentionPolicySpec(pub RetentionPolicy);
143
144impl RetentionPolicySpec {
145 pub fn age_secs(self) -> Option<u64> {
146 self.0.age().map(|d| d.as_secs())
147 }
148}
149
150impl TryFrom<String> for RetentionPolicySpec {
151 type Error = String;
152
153 fn try_from(s: String) -> Result<Self, Self::Error> {
154 if s.eq_ignore_ascii_case("infinite") {
155 return Ok(RetentionPolicySpec(RetentionPolicy::Infinite()));
156 }
157 let d = humantime::parse_duration(&s)
158 .map_err(|e| format!("invalid retention_policy {:?}: {}", s, e))?;
159 Ok(RetentionPolicySpec(RetentionPolicy::Age(d)))
160 }
161}
162
163impl<'de> Deserialize<'de> for RetentionPolicySpec {
164 fn deserialize<D: serde::Deserializer<'de>>(d: D) -> Result<Self, D::Error> {
165 let s = String::deserialize(d)?;
166 RetentionPolicySpec::try_from(s).map_err(serde::de::Error::custom)
167 }
168}
169
170impl schemars::JsonSchema for RetentionPolicySpec {
171 fn schema_name() -> Cow<'static, str> {
172 "RetentionPolicySpec".into()
173 }
174
175 fn json_schema(_: &mut schemars::SchemaGenerator) -> schemars::Schema {
176 schemars::json_schema!({
177 "type": "string",
178 "description": "Retain records unless explicitly trimmed (\"infinite\"), or automatically \
179 trim records older than the given duration (e.g. \"7days\", \"1week\").",
180 "examples": ["infinite", "7days", "1week"]
181 })
182 }
183}
184
185#[derive(Debug, Clone, Deserialize, schemars::JsonSchema)]
186#[serde(deny_unknown_fields)]
187pub struct TimestampingSpec {
188 #[serde(default)]
190 pub mode: Option<TimestampingModeSpec>,
191 #[serde(default)]
194 pub uncapped: Option<bool>,
195}
196
197#[derive(Debug, Clone, Deserialize, Serialize)]
198#[serde(rename_all = "kebab-case")]
199pub enum TimestampingModeSpec {
200 ClientPrefer,
201 ClientRequire,
202 Arrival,
203}
204
205impl schemars::JsonSchema for TimestampingModeSpec {
206 fn schema_name() -> Cow<'static, str> {
207 "TimestampingModeSpec".into()
208 }
209
210 fn json_schema(_: &mut schemars::SchemaGenerator) -> schemars::Schema {
211 schemars::json_schema!({
212 "type": "string",
213 "description": "Timestamping mode for appends that influences how timestamps are handled.",
214 "enum": ["client-prefer", "client-require", "arrival"]
215 })
216 }
217}
218
219impl From<TimestampingModeSpec> for TimestampingMode {
220 fn from(m: TimestampingModeSpec) -> Self {
221 match m {
222 TimestampingModeSpec::ClientPrefer => TimestampingMode::ClientPrefer,
223 TimestampingModeSpec::ClientRequire => TimestampingMode::ClientRequire,
224 TimestampingModeSpec::Arrival => TimestampingMode::Arrival,
225 }
226 }
227}
228
229#[derive(Debug, Clone, Deserialize, schemars::JsonSchema)]
230#[serde(deny_unknown_fields)]
231pub struct DeleteOnEmptySpec {
232 #[serde(default)]
235 pub min_age: Option<HumanDuration>,
236}
237
238#[derive(Debug, Clone, Copy)]
240pub struct HumanDuration(pub Duration);
241
242impl TryFrom<String> for HumanDuration {
243 type Error = String;
244
245 fn try_from(s: String) -> Result<Self, Self::Error> {
246 humantime::parse_duration(&s)
247 .map(HumanDuration)
248 .map_err(|e| format!("invalid duration {:?}: {}", s, e))
249 }
250}
251
252impl<'de> Deserialize<'de> for HumanDuration {
253 fn deserialize<D: serde::Deserializer<'de>>(d: D) -> Result<Self, D::Error> {
254 let s = String::deserialize(d)?;
255 HumanDuration::try_from(s).map_err(serde::de::Error::custom)
256 }
257}
258
259impl schemars::JsonSchema for HumanDuration {
260 fn schema_name() -> Cow<'static, str> {
261 "HumanDuration".into()
262 }
263
264 fn json_schema(_: &mut schemars::SchemaGenerator) -> schemars::Schema {
265 schemars::json_schema!({
266 "type": "string",
267 "description": "A duration string in humantime format, e.g. \"1day\", \"2h 30m\"",
268 "examples": ["1day", "2h 30m"]
269 })
270 }
271}
272
273impl From<BasinConfigSpec> for BasinConfig {
274 fn from(s: BasinConfigSpec) -> Self {
275 BasinConfig {
276 default_stream_config: s.default_stream_config.map(Into::into).unwrap_or_default(),
277 stream_cipher: s.stream_cipher.map(Into::into),
278 create_stream_on_append: s.create_stream_on_append.unwrap_or_default(),
279 create_stream_on_read: s.create_stream_on_read.unwrap_or_default(),
280 }
281 }
282}
283
284impl From<TimestampingSpec> for OptionalTimestampingConfig {
285 fn from(s: TimestampingSpec) -> Self {
286 OptionalTimestampingConfig {
287 mode: s.mode.map(Into::into),
288 uncapped: s.uncapped,
289 }
290 }
291}
292
293impl From<DeleteOnEmptySpec> for OptionalDeleteOnEmptyConfig {
294 fn from(s: DeleteOnEmptySpec) -> Self {
295 OptionalDeleteOnEmptyConfig {
296 min_age: s.min_age.map(|h| h.0),
297 }
298 }
299}
300
301impl From<StreamConfigSpec> for OptionalStreamConfig {
302 fn from(s: StreamConfigSpec) -> Self {
303 OptionalStreamConfig {
304 storage_class: s.storage_class.map(Into::into),
305 retention_policy: s.retention_policy.map(|rp| rp.0),
306 timestamping: s.timestamping.map(Into::into).unwrap_or_default(),
307 delete_on_empty: s.delete_on_empty.map(Into::into).unwrap_or_default(),
308 }
309 }
310}
311
312pub fn json_schema() -> serde_json::Value {
313 serde_json::to_value(schemars::schema_for!(ResourcesSpec)).unwrap()
314}
315
316pub fn validate(spec: &ResourcesSpec) -> eyre::Result<()> {
317 let mut errors = Vec::new();
318 let mut seen_basins = std::collections::HashSet::new();
319
320 for basin_spec in &spec.basins {
321 if !seen_basins.insert(basin_spec.name.clone()) {
322 errors.push(format!("duplicate basin name {:?}", basin_spec.name));
323 }
324
325 if let Err(e) = basin_spec.name.parse::<BasinName>() {
326 errors.push(format!("invalid basin name {:?}: {}", basin_spec.name, e));
327 continue;
328 }
329
330 let mut seen_streams = std::collections::HashSet::new();
331 for stream_spec in &basin_spec.streams {
332 if !seen_streams.insert(stream_spec.name.clone()) {
333 errors.push(format!(
334 "duplicate stream name {:?} in basin {:?}",
335 stream_spec.name, basin_spec.name
336 ));
337 }
338 if let Err(e) = stream_spec.name.parse::<StreamName>() {
339 errors.push(format!(
340 "invalid stream name {:?} in basin {:?}: {}",
341 stream_spec.name, basin_spec.name, e
342 ));
343 }
344 }
345 }
346
347 if errors.is_empty() {
348 Ok(())
349 } else {
350 Err(eyre::eyre!("{}", errors.join("\n")))
351 }
352}
353
354pub fn load(path: &Path) -> eyre::Result<ResourcesSpec> {
355 let contents = std::fs::read_to_string(path)
356 .map_err(|e| eyre::eyre!("failed to read init file {:?}: {}", path, e))?;
357 let spec: ResourcesSpec = serde_json::from_str(&contents)
358 .map_err(|e| eyre::eyre!("failed to parse init file {:?}: {}", path, e))?;
359 Ok(spec)
360}
361
362pub async fn apply(backend: &Backend, spec: ResourcesSpec) -> eyre::Result<()> {
363 validate(&spec)?;
364
365 for basin_spec in spec.basins {
366 let basin: BasinName = basin_spec
367 .name
368 .parse()
369 .map_err(|e| eyre::eyre!("invalid basin name {:?}: {}", basin_spec.name, e))?;
370
371 let config = basin_spec.config.map(BasinConfig::from).unwrap_or_default();
372
373 backend
374 .provision_basin(basin.clone(), config, ProvisionMode::Ensure)
375 .await
376 .map_err(|e| eyre::eyre!("failed to apply basin {:?}: {}", basin.as_ref(), e))?;
377
378 info!(basin = basin.as_ref(), "basin applied");
379
380 for stream_spec in basin_spec.streams {
381 let stream: StreamName = stream_spec
382 .name
383 .parse()
384 .map_err(|e| eyre::eyre!("invalid stream name {:?}: {}", stream_spec.name, e))?;
385
386 let config = stream_spec
387 .config
388 .map(OptionalStreamConfig::from)
389 .unwrap_or_default();
390
391 backend
392 .provision_stream(basin.clone(), stream.clone(), config, ProvisionMode::Ensure)
393 .await
394 .map_err(|e| {
395 eyre::eyre!(
396 "failed to apply stream {:?}/{:?}: {}",
397 basin.as_ref(),
398 stream.as_ref(),
399 e
400 )
401 })?;
402
403 info!(
404 basin = basin.as_ref(),
405 stream = stream.as_ref(),
406 "stream applied"
407 );
408 }
409 }
410 Ok(())
411}
412
413#[cfg(test)]
414mod tests {
415 use super::*;
416
417 fn parse_spec(json: &str) -> ResourcesSpec {
418 serde_json::from_str(json).expect("valid JSON")
419 }
420
421 #[test]
422 fn empty_spec() {
423 let spec = parse_spec("{}");
424 assert!(spec.basins.is_empty());
425 }
426
427 #[test]
428 fn basin_no_config() {
429 let spec = parse_spec(r#"{"basins":[{"name":"my-basin"}]}"#);
430 assert_eq!(spec.basins.len(), 1);
431 assert_eq!(spec.basins[0].name, "my-basin");
432 assert!(spec.basins[0].config.is_none());
433 assert!(spec.basins[0].streams.is_empty());
434 }
435
436 #[test]
437 fn retention_policy_infinite() {
438 let rp: RetentionPolicySpec = serde_json::from_str(r#""infinite""#).expect("deserialize");
439 assert!(matches!(rp.0, RetentionPolicy::Infinite()));
440 }
441
442 #[test]
443 fn retention_policy_duration() {
444 let rp: RetentionPolicySpec = serde_json::from_str(r#""7days""#).expect("deserialize");
445 assert!(matches!(rp.0, RetentionPolicy::Age(_)));
446 if let RetentionPolicy::Age(d) = rp.0 {
447 assert_eq!(d, Duration::from_secs(7 * 24 * 3600));
448 }
449 }
450
451 #[test]
452 fn retention_policy_invalid() {
453 let err = serde_json::from_str::<RetentionPolicySpec>(r#""not-a-duration""#);
454 assert!(err.is_err());
455 }
456
457 #[test]
458 fn human_duration() {
459 let hd: HumanDuration = serde_json::from_str(r#""1day""#).expect("deserialize");
460 assert_eq!(hd.0, Duration::from_secs(86400));
461 }
462
463 #[test]
464 fn full_spec_roundtrip() {
465 let json = r#"
466 {
467 "basins": [
468 {
469 "name": "my-basin",
470 "config": {
471 "create_stream_on_append": true,
472 "create_stream_on_read": false,
473 "default_stream_config": {
474 "storage_class": "express",
475 "retention_policy": "7days",
476 "timestamping": {
477 "mode": "client-prefer",
478 "uncapped": false
479 },
480 "delete_on_empty": {
481 "min_age": "1day"
482 }
483 }
484 },
485 "streams": [
486 {
487 "name": "events",
488 "config": {
489 "storage_class": "standard",
490 "retention_policy": "infinite"
491 }
492 }
493 ]
494 }
495 ]
496 }"#;
497
498 let spec = parse_spec(json);
499 assert_eq!(spec.basins.len(), 1);
500 let basin = &spec.basins[0];
501 assert_eq!(basin.name, "my-basin");
502
503 let config = basin.config.as_ref().unwrap();
504 assert_eq!(config.create_stream_on_append, Some(true));
505 assert_eq!(config.create_stream_on_read, Some(false));
506
507 let dsc = config.default_stream_config.as_ref().unwrap();
508 assert!(matches!(dsc.storage_class, Some(StorageClassSpec::Express)));
509 assert!(matches!(
510 dsc.retention_policy.as_ref().map(|r| &r.0),
511 Some(RetentionPolicy::Age(_))
512 ));
513
514 let ts = dsc.timestamping.as_ref().unwrap();
515 assert!(matches!(ts.mode, Some(TimestampingModeSpec::ClientPrefer)));
516 assert_eq!(ts.uncapped, Some(false));
517
518 let doe = dsc.delete_on_empty.as_ref().unwrap();
519 assert_eq!(
520 doe.min_age.as_ref().map(|h| h.0),
521 Some(Duration::from_secs(86400))
522 );
523
524 assert_eq!(basin.streams.len(), 1);
525 let stream = &basin.streams[0];
526 assert_eq!(stream.name, "events");
527 let sc = stream.config.as_ref().unwrap();
528 assert!(matches!(sc.storage_class, Some(StorageClassSpec::Standard)));
529 assert!(matches!(
530 sc.retention_policy.as_ref().map(|r| &r.0),
531 Some(RetentionPolicy::Infinite())
532 ));
533 }
534
535 #[test]
536 fn basin_config_conversion() {
537 let spec = BasinConfigSpec {
538 default_stream_config: None,
539 stream_cipher: None,
540 create_stream_on_append: Some(true),
541 create_stream_on_read: None,
542 };
543 let config = BasinConfig::from(spec);
544 assert!(config.create_stream_on_append);
545 assert!(!config.create_stream_on_read);
546 assert_eq!(
547 config.default_stream_config,
548 OptionalStreamConfig::default()
549 );
550 }
551
552 #[test]
553 fn validate_valid_spec() {
554 let spec = parse_spec(
555 r#"{"basins":[{"name":"my-basin","streams":[{"name":"events"},{"name":"logs"}]}]}"#,
556 );
557 assert!(validate(&spec).is_ok());
558 }
559
560 #[test]
561 fn validate_invalid_basin_name() {
562 let spec = parse_spec(r#"{"basins":[{"name":"INVALID_BASIN"}]}"#);
563 let err = validate(&spec).unwrap_err();
564 assert!(err.to_string().contains("invalid basin name"));
565 }
566
567 #[test]
568 fn validate_invalid_stream_name() {
569 let spec = parse_spec(r#"{"basins":[{"name":"my-basin","streams":[{"name":""}]}]}"#);
570 let err = validate(&spec).unwrap_err();
571 assert!(err.to_string().contains("invalid stream name"));
572 }
573
574 #[test]
575 fn validate_duplicate_basin_names() {
576 let spec = parse_spec(r#"{"basins":[{"name":"my-basin"},{"name":"my-basin"}]}"#);
577 let err = validate(&spec).unwrap_err();
578 assert!(err.to_string().contains("duplicate basin name"));
579 }
580
581 #[test]
582 fn validate_duplicate_stream_names() {
583 let spec = parse_spec(
584 r#"{"basins":[{"name":"my-basin","streams":[{"name":"events"},{"name":"events"}]}]}"#,
585 );
586 let err = validate(&spec).unwrap_err();
587 assert!(err.to_string().contains("duplicate stream name"));
588 }
589
590 #[test]
591 fn validate_multiple_errors() {
592 let spec = parse_spec(r#"{"basins":[{"name":"INVALID"},{"name":"INVALID"}]}"#);
593 let err = validate(&spec).unwrap_err();
594 let msg = err.to_string();
595 assert!(msg.contains("invalid basin name"));
596 assert!(msg.contains("duplicate basin name"));
597 }
598
599 #[test]
600 fn json_schema_is_valid() {
601 let schema = json_schema();
602 assert!(schema.is_object());
603 let schema_obj = schema.as_object().unwrap();
604
605 assert_eq!(
607 schema_obj.get("$schema"),
608 Some(&serde_json::Value::String(
609 "https://json-schema.org/draft/2020-12/schema".to_string()
610 ))
611 );
612
613 assert!(
614 schema_obj.contains_key("properties"),
615 "schema should have root properties"
616 );
617
618 assert!(
619 schema_obj.contains_key("$defs"),
620 "schema should have $defs for reusable definitions"
621 );
622
623 let properties = schema_obj.get("properties").unwrap().as_object().unwrap();
624 assert!(
625 properties.contains_key("basins"),
626 "schema should include the `basins` property"
627 );
628 }
629
630 #[test]
631 fn stream_config_conversion() {
632 let spec = StreamConfigSpec {
633 storage_class: Some(StorageClassSpec::Standard),
634 retention_policy: Some(RetentionPolicySpec(RetentionPolicy::Infinite())),
635 timestamping: None,
636 delete_on_empty: None,
637 };
638 let config = OptionalStreamConfig::from(spec);
639 assert_eq!(config.storage_class, Some(StorageClass::Standard));
640 assert_eq!(config.retention_policy, Some(RetentionPolicy::Infinite()));
641 assert_eq!(config.timestamping, OptionalTimestampingConfig::default());
642 assert_eq!(
643 config.delete_on_empty,
644 OptionalDeleteOnEmptyConfig::default()
645 );
646 }
647}