1use serde::{Deserialize, Serialize};
2use std::path::PathBuf;
3
4#[derive(Debug, Deserialize, Serialize, Clone)]
5pub struct SourceConfig {
6 pub kind: String, pub path: PathBuf, pub time_field: String, #[serde(default)]
11 pub bootstrap_servers: Option<String>,
12 #[serde(default)]
13 pub topic: Option<String>,
14 #[serde(default)]
15 pub group_id: Option<String>,
16 #[serde(default)]
17 pub auto_offset_reset: Option<String>, #[serde(default)]
19 pub commit_interval_ms: Option<u64>,
20}
21
22#[derive(Debug, Deserialize, Serialize, Clone)]
23pub struct TimeConfig {
24 pub allowed_lateness: String, }
26
27#[derive(Debug, Deserialize, Serialize, Clone)]
28pub struct WindowConfig {
29 #[serde(rename = "type")]
30 pub kind: String, pub size: String, #[serde(default)]
33 pub slide: Option<String>,
34 #[serde(default)]
35 pub gap: Option<String>,
36}
37
38#[derive(Debug, Deserialize, Serialize, Clone, Default)]
39pub struct OpsConfig {
40 #[serde(default)]
41 pub count_by: Option<String>,
42}
43
44#[derive(Debug, Deserialize, Serialize, Clone)]
45pub struct SinkConfig {
46 pub kind: String, #[serde(default)]
48 pub out_dir: PathBuf, #[serde(default)]
51 pub bootstrap_servers: Option<String>,
52 #[serde(default)]
53 pub topic: Option<String>,
54 #[serde(default)]
55 pub acks: Option<String>, #[serde(default)]
58 pub compression: Option<String>, #[serde(default)]
60 pub max_bytes: Option<u64>,
61 #[serde(default)]
62 pub partition_field: Option<String>, #[serde(default)]
64 pub partition_format: Option<String>, }
66
67#[derive(Debug, Deserialize, Serialize, Clone)]
68pub struct PipelineConfig {
69 pub source: SourceConfig,
70 pub time: TimeConfig,
71 pub window: WindowConfig,
72 pub ops: OpsConfig,
73 pub sink: SinkConfig,
74}
75
76impl PipelineConfig {
77 pub fn validate(&self) -> anyhow::Result<()> {
78 match self.source.kind.as_str() {
79 "file" => {
80 if self.source.path.as_os_str().is_empty() {
81 anyhow::bail!("source.path must be set for file source");
82 }
83 }
84 "kafka" => {
85 if self.source.bootstrap_servers.as_deref().unwrap_or("").is_empty() {
86 anyhow::bail!("source.bootstrap_servers must be set for kafka source");
87 }
88 if self.source.topic.as_deref().unwrap_or("").is_empty() {
89 anyhow::bail!("source.topic must be set for kafka source");
90 }
91 if self.source.group_id.as_deref().unwrap_or("").is_empty() {
92 anyhow::bail!("source.group_id must be set for kafka source");
93 }
94 }
95 other => anyhow::bail!("unsupported source kind: {}", other),
96 }
97 match self.sink.kind.as_str() {
98 "parquet" | "file" => {}
99 "kafka" => {
100 if self.sink.bootstrap_servers.as_deref().unwrap_or("").is_empty() {
101 anyhow::bail!("sink.bootstrap_servers must be set for kafka sink");
102 }
103 if self.sink.topic.as_deref().unwrap_or("").is_empty() {
104 anyhow::bail!("sink.topic must be set for kafka sink");
105 }
106 }
107 other => anyhow::bail!("unsupported sink kind: {}", other),
108 }
109 if self.ops.count_by.is_none() {
110 anyhow::bail!("ops.count_by must be set (e.g., word field)");
111 }
112 Ok(())
113 }
114}
115
116pub fn parse_duration_ms(s: &str) -> anyhow::Result<i64> {
117 let s = s.trim();
119 if let Some(num) = s.strip_suffix("ms") {
120 return Ok(num.parse::<i64>()?);
121 }
122 if let Some(num) = s.strip_suffix('s') {
123 return Ok(num.parse::<i64>()? * 1_000);
124 }
125 if let Some(num) = s.strip_suffix('m') {
126 return Ok(num.parse::<i64>()? * 60_000);
127 }
128 if let Some(num) = s.strip_suffix('h') {
129 return Ok(num.parse::<i64>()? * 3_600_000);
130 }
131 Ok(s.parse::<i64>()? * 1_000)
133}
134
135#[cfg(test)]
136mod tests {
137 use super::*;
138
139 #[test]
140 fn parses_duration_suffixes() {
141 assert_eq!(parse_duration_ms("500ms").unwrap(), 500);
142 assert_eq!(parse_duration_ms("10s").unwrap(), 10_000);
143 assert_eq!(parse_duration_ms("2m").unwrap(), 120_000);
144 assert_eq!(parse_duration_ms("1h").unwrap(), 3_600_000);
145 assert_eq!(parse_duration_ms("42").unwrap(), 42_000);
147 }
148
149 #[test]
150 fn validate_file_pipeline_ok() {
151 let cfg = PipelineConfig {
152 source: SourceConfig {
153 kind: "file".into(),
154 path: PathBuf::from("/tmp/input.jsonl"),
155 time_field: "event_time".into(),
156 bootstrap_servers: None,
157 topic: None,
158 group_id: None,
159 auto_offset_reset: None,
160 commit_interval_ms: None,
161 },
162 time: TimeConfig { allowed_lateness: "0s".into() },
163 window: WindowConfig { kind: "tumbling".into(), size: "60s".into(), slide: None, gap: None },
164 ops: OpsConfig { count_by: Some("word".into()) },
165 sink: SinkConfig { kind: "parquet".into(), out_dir: PathBuf::from("/tmp/out"), bootstrap_servers: None, topic: None, acks: None, compression: None, max_bytes: None, partition_field: None, partition_format: None },
166 };
167 cfg.validate().unwrap();
168 }
169
170 #[test]
171 fn validate_kafka_pipeline_ok() {
172 let cfg = PipelineConfig {
173 source: SourceConfig {
174 kind: "kafka".into(),
175 path: PathBuf::from("unused"),
176 time_field: "event_time".into(),
177 bootstrap_servers: Some("localhost:9092".into()),
178 topic: Some("t".into()),
179 group_id: Some("g".into()),
180 auto_offset_reset: Some("earliest".into()),
181 commit_interval_ms: Some(1000),
182 },
183 time: TimeConfig { allowed_lateness: "5s".into() },
184 window: WindowConfig { kind: "tumbling".into(), size: "60s".into(), slide: None, gap: None },
185 ops: OpsConfig { count_by: Some("word".into()) },
186 sink: SinkConfig { kind: "kafka".into(), out_dir: PathBuf::from("./out"), bootstrap_servers: Some("localhost:9092".into()), topic: Some("t2".into()), acks: Some("all".into()), compression: None, max_bytes: None, partition_field: None, partition_format: None },
187 };
188 cfg.validate().unwrap();
189 }
190
191 #[test]
192 fn validate_missing_ops_count_by_errors() {
193 let cfg = PipelineConfig {
194 source: SourceConfig { kind: "file".into(), path: PathBuf::from("/tmp/in"), time_field: "ts".into(), bootstrap_servers: None, topic: None, group_id: None, auto_offset_reset: None, commit_interval_ms: None },
195 time: TimeConfig { allowed_lateness: "0s".into() },
196 window: WindowConfig { kind: "tumbling".into(), size: "60s".into(), slide: None, gap: None },
197 ops: OpsConfig { count_by: None },
198 sink: SinkConfig { kind: "parquet".into(), out_dir: PathBuf::from("/tmp/out"), bootstrap_servers: None, topic: None, acks: None, compression: None, max_bytes: None, partition_field: None, partition_format: None },
199 };
200 assert!(cfg.validate().is_err());
201 }
202}