1use std::{path::Path, time::Duration};
6
7use s2_common::{
8 maybe::Maybe,
9 types::{
10 basin::BasinName,
11 config::{
12 BasinReconfiguration, DeleteOnEmptyReconfiguration, RetentionPolicy, StorageClass,
13 StreamReconfiguration, TimestampingMode, TimestampingReconfiguration,
14 },
15 resources::CreateMode,
16 stream::StreamName,
17 },
18};
19use serde::{Deserialize, Serialize};
20use tracing::info;
21
22use crate::backend::Backend;
23
24#[derive(Debug, Deserialize, Default, schemars::JsonSchema)]
25pub struct ResourcesSpec {
26 #[serde(default)]
27 pub basins: Vec<BasinSpec>,
28}
29
30#[derive(Debug, Deserialize, schemars::JsonSchema)]
31#[serde(deny_unknown_fields)]
32pub struct BasinSpec {
33 pub name: String,
34 #[serde(default)]
35 pub config: Option<BasinConfigSpec>,
36 #[serde(default)]
37 pub streams: Vec<StreamSpec>,
38}
39
40#[derive(Debug, Deserialize, schemars::JsonSchema)]
41#[serde(deny_unknown_fields)]
42pub struct StreamSpec {
43 pub name: String,
44 #[serde(default)]
45 pub config: Option<StreamConfigSpec>,
46}
47
48#[derive(Debug, Clone, Deserialize, Default, schemars::JsonSchema)]
49#[serde(deny_unknown_fields)]
50pub struct BasinConfigSpec {
51 #[serde(default)]
52 pub default_stream_config: Option<StreamConfigSpec>,
53 #[serde(default)]
55 pub create_stream_on_append: Option<bool>,
56 #[serde(default)]
58 pub create_stream_on_read: Option<bool>,
59}
60
61#[derive(Debug, Clone, Deserialize, Default, schemars::JsonSchema)]
62#[serde(deny_unknown_fields)]
63pub struct StreamConfigSpec {
64 #[serde(default)]
66 pub storage_class: Option<StorageClassSpec>,
67 #[serde(default)]
70 pub retention_policy: Option<RetentionPolicySpec>,
71 #[serde(default)]
73 pub timestamping: Option<TimestampingSpec>,
74 #[serde(default)]
76 pub delete_on_empty: Option<DeleteOnEmptySpec>,
77}
78
79#[derive(Debug, Clone, Deserialize, Serialize)]
80#[serde(rename_all = "kebab-case")]
81pub enum StorageClassSpec {
82 Standard,
83 Express,
84}
85
86impl schemars::JsonSchema for StorageClassSpec {
87 fn schema_name() -> String {
88 "StorageClassSpec".to_string()
89 }
90
91 fn json_schema(_: &mut schemars::r#gen::SchemaGenerator) -> schemars::schema::Schema {
92 schemars::schema::Schema::Object(schemars::schema::SchemaObject {
93 instance_type: Some(schemars::schema::InstanceType::String.into()),
94 metadata: Some(Box::new(schemars::schema::Metadata {
95 description: Some("Storage class for recent writes.".to_string()),
96 ..Default::default()
97 })),
98 enum_values: Some(vec![
99 serde_json::Value::String("standard".to_string()),
100 serde_json::Value::String("express".to_string()),
101 ]),
102 ..Default::default()
103 })
104 }
105}
106
107impl From<StorageClassSpec> for StorageClass {
108 fn from(s: StorageClassSpec) -> Self {
109 match s {
110 StorageClassSpec::Standard => StorageClass::Standard,
111 StorageClassSpec::Express => StorageClass::Express,
112 }
113 }
114}
115
116#[derive(Debug, Clone, Copy)]
118pub struct RetentionPolicySpec(pub RetentionPolicy);
119
120impl RetentionPolicySpec {
121 pub fn age_secs(self) -> Option<u64> {
122 self.0.age().map(|d| d.as_secs())
123 }
124}
125
126impl TryFrom<String> for RetentionPolicySpec {
127 type Error = String;
128
129 fn try_from(s: String) -> Result<Self, Self::Error> {
130 if s.eq_ignore_ascii_case("infinite") {
131 return Ok(RetentionPolicySpec(RetentionPolicy::Infinite()));
132 }
133 let d = humantime::parse_duration(&s)
134 .map_err(|e| format!("invalid retention_policy {:?}: {}", s, e))?;
135 Ok(RetentionPolicySpec(RetentionPolicy::Age(d)))
136 }
137}
138
139impl<'de> Deserialize<'de> for RetentionPolicySpec {
140 fn deserialize<D: serde::Deserializer<'de>>(d: D) -> Result<Self, D::Error> {
141 let s = String::deserialize(d)?;
142 RetentionPolicySpec::try_from(s).map_err(serde::de::Error::custom)
143 }
144}
145
146impl schemars::JsonSchema for RetentionPolicySpec {
147 fn schema_name() -> String {
148 "RetentionPolicySpec".to_string()
149 }
150
151 fn json_schema(_: &mut schemars::r#gen::SchemaGenerator) -> schemars::schema::Schema {
152 schemars::schema::Schema::Object(schemars::schema::SchemaObject {
153 instance_type: Some(schemars::schema::InstanceType::String.into()),
154 metadata: Some(Box::new(schemars::schema::Metadata {
155 description: Some(
156 "Retain records unless explicitly trimmed (\"infinite\"), or automatically \
157 trim records older than the given duration (e.g. \"7days\", \"1week\")."
158 .to_string(),
159 ),
160 examples: vec![
161 serde_json::Value::String("infinite".to_string()),
162 serde_json::Value::String("7days".to_string()),
163 serde_json::Value::String("1week".to_string()),
164 ],
165 ..Default::default()
166 })),
167 ..Default::default()
168 })
169 }
170}
171
172#[derive(Debug, Clone, Deserialize, schemars::JsonSchema)]
173#[serde(deny_unknown_fields)]
174pub struct TimestampingSpec {
175 #[serde(default)]
177 pub mode: Option<TimestampingModeSpec>,
178 #[serde(default)]
181 pub uncapped: Option<bool>,
182}
183
184#[derive(Debug, Clone, Deserialize, Serialize)]
185#[serde(rename_all = "kebab-case")]
186pub enum TimestampingModeSpec {
187 ClientPrefer,
188 ClientRequire,
189 Arrival,
190}
191
192impl schemars::JsonSchema for TimestampingModeSpec {
193 fn schema_name() -> String {
194 "TimestampingModeSpec".to_string()
195 }
196
197 fn json_schema(_: &mut schemars::r#gen::SchemaGenerator) -> schemars::schema::Schema {
198 schemars::schema::Schema::Object(schemars::schema::SchemaObject {
199 instance_type: Some(schemars::schema::InstanceType::String.into()),
200 metadata: Some(Box::new(schemars::schema::Metadata {
201 description: Some(
202 "Timestamping mode for appends that influences how timestamps are handled."
203 .to_string(),
204 ),
205 ..Default::default()
206 })),
207 enum_values: Some(vec![
208 serde_json::Value::String("client-prefer".to_string()),
209 serde_json::Value::String("client-require".to_string()),
210 serde_json::Value::String("arrival".to_string()),
211 ]),
212 ..Default::default()
213 })
214 }
215}
216
217impl From<TimestampingModeSpec> for TimestampingMode {
218 fn from(m: TimestampingModeSpec) -> Self {
219 match m {
220 TimestampingModeSpec::ClientPrefer => TimestampingMode::ClientPrefer,
221 TimestampingModeSpec::ClientRequire => TimestampingMode::ClientRequire,
222 TimestampingModeSpec::Arrival => TimestampingMode::Arrival,
223 }
224 }
225}
226
227#[derive(Debug, Clone, Deserialize, schemars::JsonSchema)]
228#[serde(deny_unknown_fields)]
229pub struct DeleteOnEmptySpec {
230 #[serde(default)]
233 pub min_age: Option<HumanDuration>,
234}
235
236#[derive(Debug, Clone, Copy)]
238pub struct HumanDuration(pub Duration);
239
240impl TryFrom<String> for HumanDuration {
241 type Error = String;
242
243 fn try_from(s: String) -> Result<Self, Self::Error> {
244 humantime::parse_duration(&s)
245 .map(HumanDuration)
246 .map_err(|e| format!("invalid duration {:?}: {}", s, e))
247 }
248}
249
250impl<'de> Deserialize<'de> for HumanDuration {
251 fn deserialize<D: serde::Deserializer<'de>>(d: D) -> Result<Self, D::Error> {
252 let s = String::deserialize(d)?;
253 HumanDuration::try_from(s).map_err(serde::de::Error::custom)
254 }
255}
256
257impl schemars::JsonSchema for HumanDuration {
258 fn schema_name() -> String {
259 "HumanDuration".to_string()
260 }
261
262 fn json_schema(_: &mut schemars::r#gen::SchemaGenerator) -> schemars::schema::Schema {
263 schemars::schema::Schema::Object(schemars::schema::SchemaObject {
264 instance_type: Some(schemars::schema::InstanceType::String.into()),
265 metadata: Some(Box::new(schemars::schema::Metadata {
266 description: Some(
267 "A duration string in humantime format, e.g. \"1day\", \"2h 30m\"".to_string(),
268 ),
269 examples: vec![
270 serde_json::Value::String("1day".to_string()),
271 serde_json::Value::String("2h 30m".to_string()),
272 ],
273 ..Default::default()
274 })),
275 ..Default::default()
276 })
277 }
278}
279
280impl From<BasinConfigSpec> for BasinReconfiguration {
281 fn from(s: BasinConfigSpec) -> Self {
282 BasinReconfiguration {
283 default_stream_config: s
284 .default_stream_config
285 .map(|dsc| Some(StreamReconfiguration::from(dsc)))
286 .map_or(Maybe::Unspecified, Maybe::Specified),
287 create_stream_on_append: s
288 .create_stream_on_append
289 .map_or(Maybe::Unspecified, Maybe::Specified),
290 create_stream_on_read: s
291 .create_stream_on_read
292 .map_or(Maybe::Unspecified, Maybe::Specified),
293 }
294 }
295}
296
297impl From<StreamConfigSpec> for StreamReconfiguration {
298 fn from(s: StreamConfigSpec) -> Self {
299 StreamReconfiguration {
300 storage_class: s
301 .storage_class
302 .map(|sc| Some(StorageClass::from(sc)))
303 .map_or(Maybe::Unspecified, Maybe::Specified),
304 retention_policy: s
305 .retention_policy
306 .map(|rp| Some(rp.0))
307 .map_or(Maybe::Unspecified, Maybe::Specified),
308 timestamping: s
309 .timestamping
310 .map(|ts| {
311 Some(TimestampingReconfiguration {
312 mode: ts
313 .mode
314 .map(|m| Some(TimestampingMode::from(m)))
315 .map_or(Maybe::Unspecified, Maybe::Specified),
316 uncapped: ts
317 .uncapped
318 .map(Some)
319 .map_or(Maybe::Unspecified, Maybe::Specified),
320 })
321 })
322 .map_or(Maybe::Unspecified, Maybe::Specified),
323 delete_on_empty: s
324 .delete_on_empty
325 .map(|doe| {
326 Some(DeleteOnEmptyReconfiguration {
327 min_age: doe
328 .min_age
329 .map(|h| Some(h.0))
330 .map_or(Maybe::Unspecified, Maybe::Specified),
331 })
332 })
333 .map_or(Maybe::Unspecified, Maybe::Specified),
334 }
335 }
336}
337
338pub fn json_schema() -> serde_json::Value {
339 serde_json::to_value(schemars::schema_for!(ResourcesSpec)).unwrap()
340}
341
342pub fn validate(spec: &ResourcesSpec) -> eyre::Result<()> {
343 let mut errors = Vec::new();
344 let mut seen_basins = std::collections::HashSet::new();
345
346 for basin_spec in &spec.basins {
347 if !seen_basins.insert(basin_spec.name.clone()) {
348 errors.push(format!("duplicate basin name {:?}", basin_spec.name));
349 }
350
351 if let Err(e) = basin_spec.name.parse::<BasinName>() {
352 errors.push(format!("invalid basin name {:?}: {}", basin_spec.name, e));
353 continue;
354 }
355
356 let mut seen_streams = std::collections::HashSet::new();
357 for stream_spec in &basin_spec.streams {
358 if !seen_streams.insert(stream_spec.name.clone()) {
359 errors.push(format!(
360 "duplicate stream name {:?} in basin {:?}",
361 stream_spec.name, basin_spec.name
362 ));
363 }
364 if let Err(e) = stream_spec.name.parse::<StreamName>() {
365 errors.push(format!(
366 "invalid stream name {:?} in basin {:?}: {}",
367 stream_spec.name, basin_spec.name, e
368 ));
369 }
370 }
371 }
372
373 if errors.is_empty() {
374 Ok(())
375 } else {
376 Err(eyre::eyre!("{}", errors.join("\n")))
377 }
378}
379
380pub fn load(path: &Path) -> eyre::Result<ResourcesSpec> {
381 let contents = std::fs::read_to_string(path)
382 .map_err(|e| eyre::eyre!("failed to read init file {:?}: {}", path, e))?;
383 let spec: ResourcesSpec = serde_json::from_str(&contents)
384 .map_err(|e| eyre::eyre!("failed to parse init file {:?}: {}", path, e))?;
385 Ok(spec)
386}
387
388pub async fn apply(backend: &Backend, spec: ResourcesSpec) -> eyre::Result<()> {
389 validate(&spec)?;
390
391 for basin_spec in spec.basins {
392 let basin: BasinName = basin_spec
393 .name
394 .parse()
395 .map_err(|e| eyre::eyre!("invalid basin name {:?}: {}", basin_spec.name, e))?;
396
397 let reconfiguration = basin_spec
398 .config
399 .map(BasinReconfiguration::from)
400 .unwrap_or_default();
401
402 backend
403 .create_basin(
404 basin.clone(),
405 reconfiguration,
406 CreateMode::CreateOrReconfigure,
407 )
408 .await
409 .map_err(|e| eyre::eyre!("failed to apply basin {:?}: {}", basin.as_ref(), e))?;
410
411 info!(basin = basin.as_ref(), "basin applied");
412
413 for stream_spec in basin_spec.streams {
414 let stream: StreamName = stream_spec
415 .name
416 .parse()
417 .map_err(|e| eyre::eyre!("invalid stream name {:?}: {}", stream_spec.name, e))?;
418
419 let reconfiguration = stream_spec
420 .config
421 .map(StreamReconfiguration::from)
422 .unwrap_or_default();
423
424 backend
425 .create_stream(
426 basin.clone(),
427 stream.clone(),
428 reconfiguration,
429 CreateMode::CreateOrReconfigure,
430 )
431 .await
432 .map_err(|e| {
433 eyre::eyre!(
434 "failed to apply stream {:?}/{:?}: {}",
435 basin.as_ref(),
436 stream.as_ref(),
437 e
438 )
439 })?;
440
441 info!(
442 basin = basin.as_ref(),
443 stream = stream.as_ref(),
444 "stream applied"
445 );
446 }
447 }
448 Ok(())
449}
450
451#[cfg(test)]
452mod tests {
453 use super::*;
454
455 fn parse_spec(json: &str) -> ResourcesSpec {
456 serde_json::from_str(json).expect("valid JSON")
457 }
458
459 #[test]
460 fn empty_spec() {
461 let spec = parse_spec("{}");
462 assert!(spec.basins.is_empty());
463 }
464
465 #[test]
466 fn basin_no_config() {
467 let spec = parse_spec(r#"{"basins":[{"name":"my-basin"}]}"#);
468 assert_eq!(spec.basins.len(), 1);
469 assert_eq!(spec.basins[0].name, "my-basin");
470 assert!(spec.basins[0].config.is_none());
471 assert!(spec.basins[0].streams.is_empty());
472 }
473
474 #[test]
475 fn retention_policy_infinite() {
476 let rp: RetentionPolicySpec = serde_json::from_str(r#""infinite""#).expect("deserialize");
477 assert!(matches!(rp.0, RetentionPolicy::Infinite()));
478 }
479
480 #[test]
481 fn retention_policy_duration() {
482 let rp: RetentionPolicySpec = serde_json::from_str(r#""7days""#).expect("deserialize");
483 assert!(matches!(rp.0, RetentionPolicy::Age(_)));
484 if let RetentionPolicy::Age(d) = rp.0 {
485 assert_eq!(d, Duration::from_secs(7 * 24 * 3600));
486 }
487 }
488
489 #[test]
490 fn retention_policy_invalid() {
491 let err = serde_json::from_str::<RetentionPolicySpec>(r#""not-a-duration""#);
492 assert!(err.is_err());
493 }
494
495 #[test]
496 fn human_duration() {
497 let hd: HumanDuration = serde_json::from_str(r#""1day""#).expect("deserialize");
498 assert_eq!(hd.0, Duration::from_secs(86400));
499 }
500
501 #[test]
502 fn full_spec_roundtrip() {
503 let json = r#"
504 {
505 "basins": [
506 {
507 "name": "my-basin",
508 "config": {
509 "create_stream_on_append": true,
510 "create_stream_on_read": false,
511 "default_stream_config": {
512 "storage_class": "express",
513 "retention_policy": "7days",
514 "timestamping": {
515 "mode": "client-prefer",
516 "uncapped": false
517 },
518 "delete_on_empty": {
519 "min_age": "1day"
520 }
521 }
522 },
523 "streams": [
524 {
525 "name": "events",
526 "config": {
527 "storage_class": "standard",
528 "retention_policy": "infinite"
529 }
530 }
531 ]
532 }
533 ]
534 }"#;
535
536 let spec = parse_spec(json);
537 assert_eq!(spec.basins.len(), 1);
538 let basin = &spec.basins[0];
539 assert_eq!(basin.name, "my-basin");
540
541 let config = basin.config.as_ref().unwrap();
542 assert_eq!(config.create_stream_on_append, Some(true));
543 assert_eq!(config.create_stream_on_read, Some(false));
544
545 let dsc = config.default_stream_config.as_ref().unwrap();
546 assert!(matches!(dsc.storage_class, Some(StorageClassSpec::Express)));
547 assert!(matches!(
548 dsc.retention_policy.as_ref().map(|r| &r.0),
549 Some(RetentionPolicy::Age(_))
550 ));
551
552 let ts = dsc.timestamping.as_ref().unwrap();
553 assert!(matches!(ts.mode, Some(TimestampingModeSpec::ClientPrefer)));
554 assert_eq!(ts.uncapped, Some(false));
555
556 let doe = dsc.delete_on_empty.as_ref().unwrap();
557 assert_eq!(
558 doe.min_age.as_ref().map(|h| h.0),
559 Some(Duration::from_secs(86400))
560 );
561
562 assert_eq!(basin.streams.len(), 1);
563 let stream = &basin.streams[0];
564 assert_eq!(stream.name, "events");
565 let sc = stream.config.as_ref().unwrap();
566 assert!(matches!(sc.storage_class, Some(StorageClassSpec::Standard)));
567 assert!(matches!(
568 sc.retention_policy.as_ref().map(|r| &r.0),
569 Some(RetentionPolicy::Infinite())
570 ));
571 }
572
573 #[test]
574 fn basin_config_conversion() {
575 let spec = BasinConfigSpec {
576 default_stream_config: None,
577 create_stream_on_append: Some(true),
578 create_stream_on_read: None,
579 };
580 let reconfig = BasinReconfiguration::from(spec);
581 assert!(matches!(
582 reconfig.create_stream_on_append,
583 Maybe::Specified(true)
584 ));
585 assert!(matches!(reconfig.create_stream_on_read, Maybe::Unspecified));
586 assert!(matches!(reconfig.default_stream_config, Maybe::Unspecified));
587 }
588
589 #[test]
590 fn validate_valid_spec() {
591 let spec = parse_spec(
592 r#"{"basins":[{"name":"my-basin","streams":[{"name":"events"},{"name":"logs"}]}]}"#,
593 );
594 assert!(validate(&spec).is_ok());
595 }
596
597 #[test]
598 fn validate_invalid_basin_name() {
599 let spec = parse_spec(r#"{"basins":[{"name":"INVALID_BASIN"}]}"#);
600 let err = validate(&spec).unwrap_err();
601 assert!(err.to_string().contains("invalid basin name"));
602 }
603
604 #[test]
605 fn validate_invalid_stream_name() {
606 let spec = parse_spec(r#"{"basins":[{"name":"my-basin","streams":[{"name":""}]}]}"#);
607 let err = validate(&spec).unwrap_err();
608 assert!(err.to_string().contains("invalid stream name"));
609 }
610
611 #[test]
612 fn validate_duplicate_basin_names() {
613 let spec = parse_spec(r#"{"basins":[{"name":"my-basin"},{"name":"my-basin"}]}"#);
614 let err = validate(&spec).unwrap_err();
615 assert!(err.to_string().contains("duplicate basin name"));
616 }
617
618 #[test]
619 fn validate_duplicate_stream_names() {
620 let spec = parse_spec(
621 r#"{"basins":[{"name":"my-basin","streams":[{"name":"events"},{"name":"events"}]}]}"#,
622 );
623 let err = validate(&spec).unwrap_err();
624 assert!(err.to_string().contains("duplicate stream name"));
625 }
626
627 #[test]
628 fn validate_multiple_errors() {
629 let spec = parse_spec(r#"{"basins":[{"name":"INVALID"},{"name":"INVALID"}]}"#);
630 let err = validate(&spec).unwrap_err();
631 let msg = err.to_string();
632 assert!(msg.contains("invalid basin name"));
633 assert!(msg.contains("duplicate basin name"));
634 }
635
636 #[test]
637 fn json_schema_is_valid() {
638 let schema = json_schema();
639 assert!(schema.is_object());
640 let schema_obj = schema.as_object().unwrap();
641 assert!(
643 schema_obj.contains_key("definitions") || schema_obj.contains_key("properties"),
644 "schema should have definitions or properties"
645 );
646 }
647
648 #[test]
649 fn stream_config_conversion() {
650 let spec = StreamConfigSpec {
651 storage_class: Some(StorageClassSpec::Standard),
652 retention_policy: Some(RetentionPolicySpec(RetentionPolicy::Infinite())),
653 timestamping: None,
654 delete_on_empty: None,
655 };
656 let reconfig = StreamReconfiguration::from(spec);
657 assert!(matches!(
658 reconfig.storage_class,
659 Maybe::Specified(Some(StorageClass::Standard))
660 ));
661 assert!(matches!(
662 reconfig.retention_policy,
663 Maybe::Specified(Some(RetentionPolicy::Infinite()))
664 ));
665 assert!(matches!(reconfig.timestamping, Maybe::Unspecified));
666 assert!(matches!(reconfig.delete_on_empty, Maybe::Unspecified));
667 }
668}