1use serde::{Deserialize, Serialize};
2use std::path::PathBuf;
3
4#[derive(Debug, Deserialize, Serialize, Clone)]
5pub struct SourceConfig {
6 pub kind: String, pub path: PathBuf, pub time_field: String, #[serde(default)]
10 pub format: Option<String>, #[serde(default)]
13 pub bootstrap_servers: Option<String>,
14 #[serde(default)]
15 pub topic: Option<String>,
16 #[serde(default)]
17 pub group_id: Option<String>,
18 #[serde(default)]
19 pub auto_offset_reset: Option<String>, #[serde(default)]
21 pub commit_interval_ms: Option<u64>,
22}
23
24#[derive(Debug, Deserialize, Serialize, Clone)]
25pub struct TimeConfig {
26 pub allowed_lateness: String, }
28
29#[derive(Debug, Deserialize, Serialize, Clone)]
30pub struct WindowConfig {
31 #[serde(rename = "type")]
32 pub kind: String, pub size: String, #[serde(default)]
35 pub slide: Option<String>,
36 #[serde(default)]
37 pub gap: Option<String>,
38}
39
40#[derive(Debug, Deserialize, Serialize, Clone, Default)]
41pub struct OpsConfig {
42 #[serde(default)]
43 pub count_by: Option<String>,
44 #[serde(default)]
46 pub agg: Option<String>,
47 #[serde(default)]
49 pub agg_field: Option<String>,
50}
51
52#[derive(Debug, Deserialize, Serialize, Clone)]
53pub struct SinkConfig {
54 pub kind: String, #[serde(default)]
56 pub out_dir: PathBuf, #[serde(default)]
59 pub bootstrap_servers: Option<String>,
60 #[serde(default)]
61 pub topic: Option<String>,
62 #[serde(default)]
63 pub acks: Option<String>, #[serde(default)]
66 pub compression: Option<String>, #[serde(default)]
68 pub max_bytes: Option<u64>,
69 #[serde(default)]
70 pub partition_field: Option<String>, #[serde(default)]
72 pub partition_format: Option<String>, }
74
75#[derive(Debug, Deserialize, Serialize, Clone)]
76pub struct PipelineConfig {
77 pub source: SourceConfig,
78 pub time: TimeConfig,
79 pub window: WindowConfig,
80 pub ops: OpsConfig,
81 pub sink: SinkConfig,
82}
83
84impl PipelineConfig {
85 pub fn validate(&self) -> anyhow::Result<()> {
86 match self.source.kind.as_str() {
87 "file" => {
88 if self.source.path.as_os_str().is_empty() {
89 anyhow::bail!("source.path must be set for file source");
90 }
91 if let Some(fmt) = self.source.format.as_deref() {
92 match fmt {
93 "jsonl" | "csv" => {}
94 other => anyhow::bail!("unsupported source.format: {}", other),
95 }
96 }
97 }
98 "kafka" => {
99 if self.source.bootstrap_servers.as_deref().unwrap_or("").is_empty() {
100 anyhow::bail!("source.bootstrap_servers must be set for kafka source");
101 }
102 if self.source.topic.as_deref().unwrap_or("").is_empty() {
103 anyhow::bail!("source.topic must be set for kafka source");
104 }
105 if self.source.group_id.as_deref().unwrap_or("").is_empty() {
106 anyhow::bail!("source.group_id must be set for kafka source");
107 }
108 }
109 other => anyhow::bail!("unsupported source kind: {}", other),
110 }
111 match self.sink.kind.as_str() {
112 "parquet" | "file" => {}
113 "kafka" => {
114 if self.sink.bootstrap_servers.as_deref().unwrap_or("").is_empty() {
115 anyhow::bail!("sink.bootstrap_servers must be set for kafka sink");
116 }
117 if self.sink.topic.as_deref().unwrap_or("").is_empty() {
118 anyhow::bail!("sink.topic must be set for kafka sink");
119 }
120 }
121 other => anyhow::bail!("unsupported sink kind: {}", other),
122 }
123 if self.ops.count_by.is_none() {
124 anyhow::bail!("ops.count_by must be set (e.g., word field)");
125 }
126 let agg = self.ops.agg.as_deref().unwrap_or("count");
128 match agg {
129 "count" => {}
130 "sum" | "avg" | "distinct" => {
131 if self.ops.agg_field.as_deref().unwrap_or("").is_empty() {
132 anyhow::bail!("ops.agg_field must be set when ops.agg is sum|avg|distinct");
133 }
134 }
135 other => anyhow::bail!("unsupported ops.agg: {}", other),
136 }
137 Ok(())
138 }
139}
140
141pub fn parse_duration_ms(s: &str) -> anyhow::Result<i64> {
142 let s = s.trim();
144 if let Some(num) = s.strip_suffix("ms") {
145 return Ok(num.parse::<i64>()?);
146 }
147 if let Some(num) = s.strip_suffix('s') {
148 return Ok(num.parse::<i64>()? * 1_000);
149 }
150 if let Some(num) = s.strip_suffix('m') {
151 return Ok(num.parse::<i64>()? * 60_000);
152 }
153 if let Some(num) = s.strip_suffix('h') {
154 return Ok(num.parse::<i64>()? * 3_600_000);
155 }
156 Ok(s.parse::<i64>()? * 1_000)
158}
159
160#[cfg(test)]
161mod tests {
162 use super::*;
163
164 #[test]
165 fn parses_duration_suffixes() {
166 assert_eq!(parse_duration_ms("500ms").unwrap(), 500);
167 assert_eq!(parse_duration_ms("10s").unwrap(), 10_000);
168 assert_eq!(parse_duration_ms("2m").unwrap(), 120_000);
169 assert_eq!(parse_duration_ms("1h").unwrap(), 3_600_000);
170 assert_eq!(parse_duration_ms("42").unwrap(), 42_000);
172 }
173
174 #[test]
175 fn validate_file_pipeline_ok() {
176 let cfg = PipelineConfig {
177 source: SourceConfig {
178 kind: "file".into(),
179 path: PathBuf::from("/tmp/input.jsonl"),
180 time_field: "event_time".into(),
181 format: None,
182 bootstrap_servers: None,
183 topic: None,
184 group_id: None,
185 auto_offset_reset: None,
186 commit_interval_ms: None,
187 },
188 time: TimeConfig {
189 allowed_lateness: "0s".into(),
190 },
191 window: WindowConfig {
192 kind: "tumbling".into(),
193 size: "60s".into(),
194 slide: None,
195 gap: None,
196 },
197 ops: OpsConfig {
198 count_by: Some("word".into()),
199 ..Default::default()
200 },
201 sink: SinkConfig {
202 kind: "parquet".into(),
203 out_dir: PathBuf::from("/tmp/out"),
204 bootstrap_servers: None,
205 topic: None,
206 acks: None,
207 compression: None,
208 max_bytes: None,
209 partition_field: None,
210 partition_format: None,
211 },
212 };
213 cfg.validate().unwrap();
214 }
215
216 #[test]
217 fn validate_kafka_pipeline_ok() {
218 let cfg = PipelineConfig {
219 source: SourceConfig {
220 kind: "kafka".into(),
221 path: PathBuf::from("unused"),
222 time_field: "event_time".into(),
223 format: None,
224 bootstrap_servers: Some("localhost:9092".into()),
225 topic: Some("t".into()),
226 group_id: Some("g".into()),
227 auto_offset_reset: Some("earliest".into()),
228 commit_interval_ms: Some(1000),
229 },
230 time: TimeConfig {
231 allowed_lateness: "5s".into(),
232 },
233 window: WindowConfig {
234 kind: "tumbling".into(),
235 size: "60s".into(),
236 slide: None,
237 gap: None,
238 },
239 ops: OpsConfig {
240 count_by: Some("word".into()),
241 ..Default::default()
242 },
243 sink: SinkConfig {
244 kind: "kafka".into(),
245 out_dir: PathBuf::from("./out"),
246 bootstrap_servers: Some("localhost:9092".into()),
247 topic: Some("t2".into()),
248 acks: Some("all".into()),
249 compression: None,
250 max_bytes: None,
251 partition_field: None,
252 partition_format: None,
253 },
254 };
255 cfg.validate().unwrap();
256 }
257
258 #[test]
259 fn validate_missing_ops_count_by_errors() {
260 let cfg = PipelineConfig {
261 source: SourceConfig {
262 kind: "file".into(),
263 path: PathBuf::from("/tmp/in"),
264 time_field: "ts".into(),
265 format: None,
266 bootstrap_servers: None,
267 topic: None,
268 group_id: None,
269 auto_offset_reset: None,
270 commit_interval_ms: None,
271 },
272 time: TimeConfig {
273 allowed_lateness: "0s".into(),
274 },
275 window: WindowConfig {
276 kind: "tumbling".into(),
277 size: "60s".into(),
278 slide: None,
279 gap: None,
280 },
281 ops: OpsConfig {
282 count_by: None,
283 ..Default::default()
284 },
285 sink: SinkConfig {
286 kind: "parquet".into(),
287 out_dir: PathBuf::from("/tmp/out"),
288 bootstrap_servers: None,
289 topic: None,
290 acks: None,
291 compression: None,
292 max_bytes: None,
293 partition_field: None,
294 partition_format: None,
295 },
296 };
297 assert!(cfg.validate().is_err());
298 }
299}