1use std::{borrow::Cow, path::Path, time::Duration};
6
7use s2_common::{
8 maybe::Maybe,
9 types::{
10 basin::BasinName,
11 config::{
12 BasinReconfiguration, DeleteOnEmptyReconfiguration, RetentionPolicy, StorageClass,
13 StreamReconfiguration, TimestampingMode, TimestampingReconfiguration,
14 },
15 resources::CreateMode,
16 stream::StreamName,
17 },
18};
19use serde::{Deserialize, Serialize};
20use tracing::info;
21
22use crate::backend::Backend;
23
24#[derive(Debug, Deserialize, Default, schemars::JsonSchema)]
25pub struct ResourcesSpec {
26 #[serde(default)]
27 pub basins: Vec<BasinSpec>,
28}
29
30#[derive(Debug, Deserialize, schemars::JsonSchema)]
31#[serde(deny_unknown_fields)]
32pub struct BasinSpec {
33 pub name: String,
34 #[serde(default)]
35 pub config: Option<BasinConfigSpec>,
36 #[serde(default)]
37 pub streams: Vec<StreamSpec>,
38}
39
40#[derive(Debug, Deserialize, schemars::JsonSchema)]
41#[serde(deny_unknown_fields)]
42pub struct StreamSpec {
43 pub name: String,
44 #[serde(default)]
45 pub config: Option<StreamConfigSpec>,
46}
47
48#[derive(Debug, Clone, Deserialize, Default, schemars::JsonSchema)]
49#[serde(deny_unknown_fields)]
50pub struct BasinConfigSpec {
51 #[serde(default)]
52 pub default_stream_config: Option<StreamConfigSpec>,
53 #[serde(default)]
55 pub create_stream_on_append: Option<bool>,
56 #[serde(default)]
58 pub create_stream_on_read: Option<bool>,
59}
60
61#[derive(Debug, Clone, Deserialize, Default, schemars::JsonSchema)]
62#[serde(deny_unknown_fields)]
63pub struct StreamConfigSpec {
64 #[serde(default)]
66 pub storage_class: Option<StorageClassSpec>,
67 #[serde(default)]
70 pub retention_policy: Option<RetentionPolicySpec>,
71 #[serde(default)]
73 pub timestamping: Option<TimestampingSpec>,
74 #[serde(default)]
76 pub delete_on_empty: Option<DeleteOnEmptySpec>,
77}
78
79#[derive(Debug, Clone, Deserialize, Serialize)]
80#[serde(rename_all = "kebab-case")]
81pub enum StorageClassSpec {
82 Standard,
83 Express,
84}
85
86impl schemars::JsonSchema for StorageClassSpec {
87 fn schema_name() -> Cow<'static, str> {
88 "StorageClassSpec".into()
89 }
90
91 fn json_schema(_: &mut schemars::SchemaGenerator) -> schemars::Schema {
92 schemars::json_schema!({
93 "type": "string",
94 "description": "Storage class for recent writes.",
95 "enum": ["standard", "express"]
96 })
97 }
98}
99
100impl From<StorageClassSpec> for StorageClass {
101 fn from(s: StorageClassSpec) -> Self {
102 match s {
103 StorageClassSpec::Standard => StorageClass::Standard,
104 StorageClassSpec::Express => StorageClass::Express,
105 }
106 }
107}
108
109#[derive(Debug, Clone, Copy)]
111pub struct RetentionPolicySpec(pub RetentionPolicy);
112
113impl RetentionPolicySpec {
114 pub fn age_secs(self) -> Option<u64> {
115 self.0.age().map(|d| d.as_secs())
116 }
117}
118
119impl TryFrom<String> for RetentionPolicySpec {
120 type Error = String;
121
122 fn try_from(s: String) -> Result<Self, Self::Error> {
123 if s.eq_ignore_ascii_case("infinite") {
124 return Ok(RetentionPolicySpec(RetentionPolicy::Infinite()));
125 }
126 let d = humantime::parse_duration(&s)
127 .map_err(|e| format!("invalid retention_policy {:?}: {}", s, e))?;
128 Ok(RetentionPolicySpec(RetentionPolicy::Age(d)))
129 }
130}
131
132impl<'de> Deserialize<'de> for RetentionPolicySpec {
133 fn deserialize<D: serde::Deserializer<'de>>(d: D) -> Result<Self, D::Error> {
134 let s = String::deserialize(d)?;
135 RetentionPolicySpec::try_from(s).map_err(serde::de::Error::custom)
136 }
137}
138
139impl schemars::JsonSchema for RetentionPolicySpec {
140 fn schema_name() -> Cow<'static, str> {
141 "RetentionPolicySpec".into()
142 }
143
144 fn json_schema(_: &mut schemars::SchemaGenerator) -> schemars::Schema {
145 schemars::json_schema!({
146 "type": "string",
147 "description": "Retain records unless explicitly trimmed (\"infinite\"), or automatically \
148 trim records older than the given duration (e.g. \"7days\", \"1week\").",
149 "examples": ["infinite", "7days", "1week"]
150 })
151 }
152}
153
154#[derive(Debug, Clone, Deserialize, schemars::JsonSchema)]
155#[serde(deny_unknown_fields)]
156pub struct TimestampingSpec {
157 #[serde(default)]
159 pub mode: Option<TimestampingModeSpec>,
160 #[serde(default)]
163 pub uncapped: Option<bool>,
164}
165
166#[derive(Debug, Clone, Deserialize, Serialize)]
167#[serde(rename_all = "kebab-case")]
168pub enum TimestampingModeSpec {
169 ClientPrefer,
170 ClientRequire,
171 Arrival,
172}
173
174impl schemars::JsonSchema for TimestampingModeSpec {
175 fn schema_name() -> Cow<'static, str> {
176 "TimestampingModeSpec".into()
177 }
178
179 fn json_schema(_: &mut schemars::SchemaGenerator) -> schemars::Schema {
180 schemars::json_schema!({
181 "type": "string",
182 "description": "Timestamping mode for appends that influences how timestamps are handled.",
183 "enum": ["client-prefer", "client-require", "arrival"]
184 })
185 }
186}
187
188impl From<TimestampingModeSpec> for TimestampingMode {
189 fn from(m: TimestampingModeSpec) -> Self {
190 match m {
191 TimestampingModeSpec::ClientPrefer => TimestampingMode::ClientPrefer,
192 TimestampingModeSpec::ClientRequire => TimestampingMode::ClientRequire,
193 TimestampingModeSpec::Arrival => TimestampingMode::Arrival,
194 }
195 }
196}
197
198#[derive(Debug, Clone, Deserialize, schemars::JsonSchema)]
199#[serde(deny_unknown_fields)]
200pub struct DeleteOnEmptySpec {
201 #[serde(default)]
204 pub min_age: Option<HumanDuration>,
205}
206
207#[derive(Debug, Clone, Copy)]
209pub struct HumanDuration(pub Duration);
210
211impl TryFrom<String> for HumanDuration {
212 type Error = String;
213
214 fn try_from(s: String) -> Result<Self, Self::Error> {
215 humantime::parse_duration(&s)
216 .map(HumanDuration)
217 .map_err(|e| format!("invalid duration {:?}: {}", s, e))
218 }
219}
220
221impl<'de> Deserialize<'de> for HumanDuration {
222 fn deserialize<D: serde::Deserializer<'de>>(d: D) -> Result<Self, D::Error> {
223 let s = String::deserialize(d)?;
224 HumanDuration::try_from(s).map_err(serde::de::Error::custom)
225 }
226}
227
228impl schemars::JsonSchema for HumanDuration {
229 fn schema_name() -> Cow<'static, str> {
230 "HumanDuration".into()
231 }
232
233 fn json_schema(_: &mut schemars::SchemaGenerator) -> schemars::Schema {
234 schemars::json_schema!({
235 "type": "string",
236 "description": "A duration string in humantime format, e.g. \"1day\", \"2h 30m\"",
237 "examples": ["1day", "2h 30m"]
238 })
239 }
240}
241
242impl From<BasinConfigSpec> for BasinReconfiguration {
243 fn from(s: BasinConfigSpec) -> Self {
244 BasinReconfiguration {
245 default_stream_config: s
246 .default_stream_config
247 .map(|dsc| Some(StreamReconfiguration::from(dsc)))
248 .map_or(Maybe::Unspecified, Maybe::Specified),
249 create_stream_on_append: s
250 .create_stream_on_append
251 .map_or(Maybe::Unspecified, Maybe::Specified),
252 create_stream_on_read: s
253 .create_stream_on_read
254 .map_or(Maybe::Unspecified, Maybe::Specified),
255 }
256 }
257}
258
259impl From<StreamConfigSpec> for StreamReconfiguration {
260 fn from(s: StreamConfigSpec) -> Self {
261 StreamReconfiguration {
262 storage_class: s
263 .storage_class
264 .map(|sc| Some(StorageClass::from(sc)))
265 .map_or(Maybe::Unspecified, Maybe::Specified),
266 retention_policy: s
267 .retention_policy
268 .map(|rp| Some(rp.0))
269 .map_or(Maybe::Unspecified, Maybe::Specified),
270 timestamping: s
271 .timestamping
272 .map(|ts| {
273 Some(TimestampingReconfiguration {
274 mode: ts
275 .mode
276 .map(|m| Some(TimestampingMode::from(m)))
277 .map_or(Maybe::Unspecified, Maybe::Specified),
278 uncapped: ts
279 .uncapped
280 .map(Some)
281 .map_or(Maybe::Unspecified, Maybe::Specified),
282 })
283 })
284 .map_or(Maybe::Unspecified, Maybe::Specified),
285 delete_on_empty: s
286 .delete_on_empty
287 .map(|doe| {
288 Some(DeleteOnEmptyReconfiguration {
289 min_age: doe
290 .min_age
291 .map(|h| Some(h.0))
292 .map_or(Maybe::Unspecified, Maybe::Specified),
293 })
294 })
295 .map_or(Maybe::Unspecified, Maybe::Specified),
296 }
297 }
298}
299
300pub fn json_schema() -> serde_json::Value {
301 serde_json::to_value(schemars::schema_for!(ResourcesSpec)).unwrap()
302}
303
304pub fn validate(spec: &ResourcesSpec) -> eyre::Result<()> {
305 let mut errors = Vec::new();
306 let mut seen_basins = std::collections::HashSet::new();
307
308 for basin_spec in &spec.basins {
309 if !seen_basins.insert(basin_spec.name.clone()) {
310 errors.push(format!("duplicate basin name {:?}", basin_spec.name));
311 }
312
313 if let Err(e) = basin_spec.name.parse::<BasinName>() {
314 errors.push(format!("invalid basin name {:?}: {}", basin_spec.name, e));
315 continue;
316 }
317
318 let mut seen_streams = std::collections::HashSet::new();
319 for stream_spec in &basin_spec.streams {
320 if !seen_streams.insert(stream_spec.name.clone()) {
321 errors.push(format!(
322 "duplicate stream name {:?} in basin {:?}",
323 stream_spec.name, basin_spec.name
324 ));
325 }
326 if let Err(e) = stream_spec.name.parse::<StreamName>() {
327 errors.push(format!(
328 "invalid stream name {:?} in basin {:?}: {}",
329 stream_spec.name, basin_spec.name, e
330 ));
331 }
332 }
333 }
334
335 if errors.is_empty() {
336 Ok(())
337 } else {
338 Err(eyre::eyre!("{}", errors.join("\n")))
339 }
340}
341
342pub fn load(path: &Path) -> eyre::Result<ResourcesSpec> {
343 let contents = std::fs::read_to_string(path)
344 .map_err(|e| eyre::eyre!("failed to read init file {:?}: {}", path, e))?;
345 let spec: ResourcesSpec = serde_json::from_str(&contents)
346 .map_err(|e| eyre::eyre!("failed to parse init file {:?}: {}", path, e))?;
347 Ok(spec)
348}
349
350pub async fn apply(backend: &Backend, spec: ResourcesSpec) -> eyre::Result<()> {
351 validate(&spec)?;
352
353 for basin_spec in spec.basins {
354 let basin: BasinName = basin_spec
355 .name
356 .parse()
357 .map_err(|e| eyre::eyre!("invalid basin name {:?}: {}", basin_spec.name, e))?;
358
359 let reconfiguration = basin_spec
360 .config
361 .map(BasinReconfiguration::from)
362 .unwrap_or_default();
363
364 backend
365 .create_basin(
366 basin.clone(),
367 reconfiguration,
368 CreateMode::CreateOrReconfigure,
369 )
370 .await
371 .map_err(|e| eyre::eyre!("failed to apply basin {:?}: {}", basin.as_ref(), e))?;
372
373 info!(basin = basin.as_ref(), "basin applied");
374
375 for stream_spec in basin_spec.streams {
376 let stream: StreamName = stream_spec
377 .name
378 .parse()
379 .map_err(|e| eyre::eyre!("invalid stream name {:?}: {}", stream_spec.name, e))?;
380
381 let reconfiguration = stream_spec
382 .config
383 .map(StreamReconfiguration::from)
384 .unwrap_or_default();
385
386 backend
387 .create_stream(
388 basin.clone(),
389 stream.clone(),
390 reconfiguration,
391 CreateMode::CreateOrReconfigure,
392 )
393 .await
394 .map_err(|e| {
395 eyre::eyre!(
396 "failed to apply stream {:?}/{:?}: {}",
397 basin.as_ref(),
398 stream.as_ref(),
399 e
400 )
401 })?;
402
403 info!(
404 basin = basin.as_ref(),
405 stream = stream.as_ref(),
406 "stream applied"
407 );
408 }
409 }
410 Ok(())
411}
412
413#[cfg(test)]
414mod tests {
415 use super::*;
416
417 fn parse_spec(json: &str) -> ResourcesSpec {
418 serde_json::from_str(json).expect("valid JSON")
419 }
420
421 #[test]
422 fn empty_spec() {
423 let spec = parse_spec("{}");
424 assert!(spec.basins.is_empty());
425 }
426
427 #[test]
428 fn basin_no_config() {
429 let spec = parse_spec(r#"{"basins":[{"name":"my-basin"}]}"#);
430 assert_eq!(spec.basins.len(), 1);
431 assert_eq!(spec.basins[0].name, "my-basin");
432 assert!(spec.basins[0].config.is_none());
433 assert!(spec.basins[0].streams.is_empty());
434 }
435
436 #[test]
437 fn retention_policy_infinite() {
438 let rp: RetentionPolicySpec = serde_json::from_str(r#""infinite""#).expect("deserialize");
439 assert!(matches!(rp.0, RetentionPolicy::Infinite()));
440 }
441
442 #[test]
443 fn retention_policy_duration() {
444 let rp: RetentionPolicySpec = serde_json::from_str(r#""7days""#).expect("deserialize");
445 assert!(matches!(rp.0, RetentionPolicy::Age(_)));
446 if let RetentionPolicy::Age(d) = rp.0 {
447 assert_eq!(d, Duration::from_secs(7 * 24 * 3600));
448 }
449 }
450
451 #[test]
452 fn retention_policy_invalid() {
453 let err = serde_json::from_str::<RetentionPolicySpec>(r#""not-a-duration""#);
454 assert!(err.is_err());
455 }
456
457 #[test]
458 fn human_duration() {
459 let hd: HumanDuration = serde_json::from_str(r#""1day""#).expect("deserialize");
460 assert_eq!(hd.0, Duration::from_secs(86400));
461 }
462
463 #[test]
464 fn full_spec_roundtrip() {
465 let json = r#"
466 {
467 "basins": [
468 {
469 "name": "my-basin",
470 "config": {
471 "create_stream_on_append": true,
472 "create_stream_on_read": false,
473 "default_stream_config": {
474 "storage_class": "express",
475 "retention_policy": "7days",
476 "timestamping": {
477 "mode": "client-prefer",
478 "uncapped": false
479 },
480 "delete_on_empty": {
481 "min_age": "1day"
482 }
483 }
484 },
485 "streams": [
486 {
487 "name": "events",
488 "config": {
489 "storage_class": "standard",
490 "retention_policy": "infinite"
491 }
492 }
493 ]
494 }
495 ]
496 }"#;
497
498 let spec = parse_spec(json);
499 assert_eq!(spec.basins.len(), 1);
500 let basin = &spec.basins[0];
501 assert_eq!(basin.name, "my-basin");
502
503 let config = basin.config.as_ref().unwrap();
504 assert_eq!(config.create_stream_on_append, Some(true));
505 assert_eq!(config.create_stream_on_read, Some(false));
506
507 let dsc = config.default_stream_config.as_ref().unwrap();
508 assert!(matches!(dsc.storage_class, Some(StorageClassSpec::Express)));
509 assert!(matches!(
510 dsc.retention_policy.as_ref().map(|r| &r.0),
511 Some(RetentionPolicy::Age(_))
512 ));
513
514 let ts = dsc.timestamping.as_ref().unwrap();
515 assert!(matches!(ts.mode, Some(TimestampingModeSpec::ClientPrefer)));
516 assert_eq!(ts.uncapped, Some(false));
517
518 let doe = dsc.delete_on_empty.as_ref().unwrap();
519 assert_eq!(
520 doe.min_age.as_ref().map(|h| h.0),
521 Some(Duration::from_secs(86400))
522 );
523
524 assert_eq!(basin.streams.len(), 1);
525 let stream = &basin.streams[0];
526 assert_eq!(stream.name, "events");
527 let sc = stream.config.as_ref().unwrap();
528 assert!(matches!(sc.storage_class, Some(StorageClassSpec::Standard)));
529 assert!(matches!(
530 sc.retention_policy.as_ref().map(|r| &r.0),
531 Some(RetentionPolicy::Infinite())
532 ));
533 }
534
535 #[test]
536 fn basin_config_conversion() {
537 let spec = BasinConfigSpec {
538 default_stream_config: None,
539 create_stream_on_append: Some(true),
540 create_stream_on_read: None,
541 };
542 let reconfig = BasinReconfiguration::from(spec);
543 assert!(matches!(
544 reconfig.create_stream_on_append,
545 Maybe::Specified(true)
546 ));
547 assert!(matches!(reconfig.create_stream_on_read, Maybe::Unspecified));
548 assert!(matches!(reconfig.default_stream_config, Maybe::Unspecified));
549 }
550
551 #[test]
552 fn validate_valid_spec() {
553 let spec = parse_spec(
554 r#"{"basins":[{"name":"my-basin","streams":[{"name":"events"},{"name":"logs"}]}]}"#,
555 );
556 assert!(validate(&spec).is_ok());
557 }
558
559 #[test]
560 fn validate_invalid_basin_name() {
561 let spec = parse_spec(r#"{"basins":[{"name":"INVALID_BASIN"}]}"#);
562 let err = validate(&spec).unwrap_err();
563 assert!(err.to_string().contains("invalid basin name"));
564 }
565
566 #[test]
567 fn validate_invalid_stream_name() {
568 let spec = parse_spec(r#"{"basins":[{"name":"my-basin","streams":[{"name":""}]}]}"#);
569 let err = validate(&spec).unwrap_err();
570 assert!(err.to_string().contains("invalid stream name"));
571 }
572
573 #[test]
574 fn validate_duplicate_basin_names() {
575 let spec = parse_spec(r#"{"basins":[{"name":"my-basin"},{"name":"my-basin"}]}"#);
576 let err = validate(&spec).unwrap_err();
577 assert!(err.to_string().contains("duplicate basin name"));
578 }
579
580 #[test]
581 fn validate_duplicate_stream_names() {
582 let spec = parse_spec(
583 r#"{"basins":[{"name":"my-basin","streams":[{"name":"events"},{"name":"events"}]}]}"#,
584 );
585 let err = validate(&spec).unwrap_err();
586 assert!(err.to_string().contains("duplicate stream name"));
587 }
588
589 #[test]
590 fn validate_multiple_errors() {
591 let spec = parse_spec(r#"{"basins":[{"name":"INVALID"},{"name":"INVALID"}]}"#);
592 let err = validate(&spec).unwrap_err();
593 let msg = err.to_string();
594 assert!(msg.contains("invalid basin name"));
595 assert!(msg.contains("duplicate basin name"));
596 }
597
598 #[test]
599 fn json_schema_is_valid() {
600 let schema = json_schema();
601 assert!(schema.is_object());
602 let schema_obj = schema.as_object().unwrap();
603
604 assert_eq!(
606 schema_obj.get("$schema"),
607 Some(&serde_json::Value::String(
608 "https://json-schema.org/draft/2020-12/schema".to_string()
609 ))
610 );
611
612 assert!(
613 schema_obj.contains_key("properties"),
614 "schema should have root properties"
615 );
616
617 assert!(
618 schema_obj.contains_key("$defs"),
619 "schema should have $defs for reusable definitions"
620 );
621
622 let properties = schema_obj.get("properties").unwrap().as_object().unwrap();
623 assert!(
624 properties.contains_key("basins"),
625 "schema should include the `basins` property"
626 );
627 }
628
629 #[test]
630 fn stream_config_conversion() {
631 let spec = StreamConfigSpec {
632 storage_class: Some(StorageClassSpec::Standard),
633 retention_policy: Some(RetentionPolicySpec(RetentionPolicy::Infinite())),
634 timestamping: None,
635 delete_on_empty: None,
636 };
637 let reconfig = StreamReconfiguration::from(spec);
638 assert!(matches!(
639 reconfig.storage_class,
640 Maybe::Specified(Some(StorageClass::Standard))
641 ));
642 assert!(matches!(
643 reconfig.retention_policy,
644 Maybe::Specified(Some(RetentionPolicy::Infinite()))
645 ));
646 assert!(matches!(reconfig.timestamping, Maybe::Unspecified));
647 assert!(matches!(reconfig.delete_on_empty, Maybe::Unspecified));
648 }
649}