pulse_core/
config.rs

1use serde::{Deserialize, Serialize};
2use std::path::PathBuf;
3
4#[derive(Debug, Deserialize, Serialize, Clone)]
5pub struct SourceConfig {
6    pub kind: String,       // "file"
7    pub path: PathBuf,      // path to file
8    pub time_field: String, // e.g., "event_time" or "ts"
9    #[serde(default)]
10    pub format: Option<String>, // "jsonl" | "csv" (file source)
11    // Kafka-specific (used when kind=="kafka")
12    #[serde(default)]
13    pub bootstrap_servers: Option<String>,
14    #[serde(default)]
15    pub topic: Option<String>,
16    #[serde(default)]
17    pub group_id: Option<String>,
18    #[serde(default)]
19    pub auto_offset_reset: Option<String>, // "earliest" | "latest"
20    #[serde(default)]
21    pub commit_interval_ms: Option<u64>,
22}
23
24#[derive(Debug, Deserialize, Serialize, Clone)]
25pub struct TimeConfig {
26    pub allowed_lateness: String, // e.g., "10s"
27}
28
29#[derive(Debug, Deserialize, Serialize, Clone)]
30pub struct WindowConfig {
31    #[serde(rename = "type")]
32    pub kind: String, // tumbling|sliding|session
33    pub size: String, // e.g., "60s"
34    #[serde(default)]
35    pub slide: Option<String>,
36    #[serde(default)]
37    pub gap: Option<String>,
38}
39
40#[derive(Debug, Deserialize, Serialize, Clone, Default)]
41pub struct OpsConfig {
42    #[serde(default)]
43    pub count_by: Option<String>,
44    /// Aggregation to perform: count|sum|avg|distinct (default: count)
45    #[serde(default)]
46    pub agg: Option<String>,
47    /// Field used for sum/avg/distinct (required when agg is not count)
48    #[serde(default)]
49    pub agg_field: Option<String>,
50}
51
52#[derive(Debug, Deserialize, Serialize, Clone)]
53pub struct SinkConfig {
54    pub kind: String, // "parquet" | "file" | "kafka"
55    #[serde(default)]
56    pub out_dir: PathBuf, // for parquet or file path for file sink
57    // Kafka-specific (when kind=="kafka")
58    #[serde(default)]
59    pub bootstrap_servers: Option<String>,
60    #[serde(default)]
61    pub topic: Option<String>,
62    #[serde(default)]
63    pub acks: Option<String>, // "all" | "1" | "0"
64    // Parquet-specific options (when kind=="parquet")
65    #[serde(default)]
66    pub compression: Option<String>, // none|snappy|zstd
67    #[serde(default)]
68    pub max_bytes: Option<u64>,
69    #[serde(default)]
70    pub partition_field: Option<String>, // defaults to event_time if None
71    #[serde(default)]
72    pub partition_format: Option<String>, // e.g., %Y-%m-%d for date partitions
73}
74
75#[derive(Debug, Deserialize, Serialize, Clone)]
76pub struct PipelineConfig {
77    pub source: SourceConfig,
78    pub time: TimeConfig,
79    pub window: WindowConfig,
80    pub ops: OpsConfig,
81    pub sink: SinkConfig,
82}
83
84impl PipelineConfig {
85    pub fn validate(&self) -> anyhow::Result<()> {
86        match self.source.kind.as_str() {
87            "file" => {
88                if self.source.path.as_os_str().is_empty() {
89                    anyhow::bail!("source.path must be set for file source");
90                }
91                if let Some(fmt) = self.source.format.as_deref() {
92                    match fmt {
93                        "jsonl" | "csv" => {}
94                        other => anyhow::bail!("unsupported source.format: {}", other),
95                    }
96                }
97            }
98            "kafka" => {
99                if self.source.bootstrap_servers.as_deref().unwrap_or("").is_empty() {
100                    anyhow::bail!("source.bootstrap_servers must be set for kafka source");
101                }
102                if self.source.topic.as_deref().unwrap_or("").is_empty() {
103                    anyhow::bail!("source.topic must be set for kafka source");
104                }
105                if self.source.group_id.as_deref().unwrap_or("").is_empty() {
106                    anyhow::bail!("source.group_id must be set for kafka source");
107                }
108            }
109            other => anyhow::bail!("unsupported source kind: {}", other),
110        }
111        match self.sink.kind.as_str() {
112            "parquet" | "file" => {}
113            "kafka" => {
114                if self.sink.bootstrap_servers.as_deref().unwrap_or("").is_empty() {
115                    anyhow::bail!("sink.bootstrap_servers must be set for kafka sink");
116                }
117                if self.sink.topic.as_deref().unwrap_or("").is_empty() {
118                    anyhow::bail!("sink.topic must be set for kafka sink");
119                }
120            }
121            other => anyhow::bail!("unsupported sink kind: {}", other),
122        }
123        if self.ops.count_by.is_none() {
124            anyhow::bail!("ops.count_by must be set (e.g., word field)");
125        }
126        // Validate aggregation selection
127        let agg = self.ops.agg.as_deref().unwrap_or("count");
128        match agg {
129            "count" => {}
130            "sum" | "avg" | "distinct" => {
131                if self.ops.agg_field.as_deref().unwrap_or("").is_empty() {
132                    anyhow::bail!("ops.agg_field must be set when ops.agg is sum|avg|distinct");
133                }
134            }
135            other => anyhow::bail!("unsupported ops.agg: {}", other),
136        }
137        Ok(())
138    }
139}
140
141pub fn parse_duration_ms(s: &str) -> anyhow::Result<i64> {
142    // very small parser for values like "10s", "500ms", "2m"
143    let s = s.trim();
144    if let Some(num) = s.strip_suffix("ms") {
145        return Ok(num.parse::<i64>()?);
146    }
147    if let Some(num) = s.strip_suffix('s') {
148        return Ok(num.parse::<i64>()? * 1_000);
149    }
150    if let Some(num) = s.strip_suffix('m') {
151        return Ok(num.parse::<i64>()? * 60_000);
152    }
153    if let Some(num) = s.strip_suffix('h') {
154        return Ok(num.parse::<i64>()? * 3_600_000);
155    }
156    // default assume seconds
157    Ok(s.parse::<i64>()? * 1_000)
158}
159
160#[cfg(test)]
161mod tests {
162    use super::*;
163
164    #[test]
165    fn parses_duration_suffixes() {
166        assert_eq!(parse_duration_ms("500ms").unwrap(), 500);
167        assert_eq!(parse_duration_ms("10s").unwrap(), 10_000);
168        assert_eq!(parse_duration_ms("2m").unwrap(), 120_000);
169        assert_eq!(parse_duration_ms("1h").unwrap(), 3_600_000);
170        // default seconds when no unit
171        assert_eq!(parse_duration_ms("42").unwrap(), 42_000);
172    }
173
174    #[test]
175    fn validate_file_pipeline_ok() {
176        let cfg = PipelineConfig {
177            source: SourceConfig {
178                kind: "file".into(),
179                path: PathBuf::from("/tmp/input.jsonl"),
180                time_field: "event_time".into(),
181                format: None,
182                bootstrap_servers: None,
183                topic: None,
184                group_id: None,
185                auto_offset_reset: None,
186                commit_interval_ms: None,
187            },
188            time: TimeConfig {
189                allowed_lateness: "0s".into(),
190            },
191            window: WindowConfig {
192                kind: "tumbling".into(),
193                size: "60s".into(),
194                slide: None,
195                gap: None,
196            },
197            ops: OpsConfig {
198                count_by: Some("word".into()),
199                ..Default::default()
200            },
201            sink: SinkConfig {
202                kind: "parquet".into(),
203                out_dir: PathBuf::from("/tmp/out"),
204                bootstrap_servers: None,
205                topic: None,
206                acks: None,
207                compression: None,
208                max_bytes: None,
209                partition_field: None,
210                partition_format: None,
211            },
212        };
213        cfg.validate().unwrap();
214    }
215
216    #[test]
217    fn validate_kafka_pipeline_ok() {
218        let cfg = PipelineConfig {
219            source: SourceConfig {
220                kind: "kafka".into(),
221                path: PathBuf::from("unused"),
222                time_field: "event_time".into(),
223                format: None,
224                bootstrap_servers: Some("localhost:9092".into()),
225                topic: Some("t".into()),
226                group_id: Some("g".into()),
227                auto_offset_reset: Some("earliest".into()),
228                commit_interval_ms: Some(1000),
229            },
230            time: TimeConfig {
231                allowed_lateness: "5s".into(),
232            },
233            window: WindowConfig {
234                kind: "tumbling".into(),
235                size: "60s".into(),
236                slide: None,
237                gap: None,
238            },
239            ops: OpsConfig {
240                count_by: Some("word".into()),
241                ..Default::default()
242            },
243            sink: SinkConfig {
244                kind: "kafka".into(),
245                out_dir: PathBuf::from("./out"),
246                bootstrap_servers: Some("localhost:9092".into()),
247                topic: Some("t2".into()),
248                acks: Some("all".into()),
249                compression: None,
250                max_bytes: None,
251                partition_field: None,
252                partition_format: None,
253            },
254        };
255        cfg.validate().unwrap();
256    }
257
258    #[test]
259    fn validate_missing_ops_count_by_errors() {
260        let cfg = PipelineConfig {
261            source: SourceConfig {
262                kind: "file".into(),
263                path: PathBuf::from("/tmp/in"),
264                time_field: "ts".into(),
265                format: None,
266                bootstrap_servers: None,
267                topic: None,
268                group_id: None,
269                auto_offset_reset: None,
270                commit_interval_ms: None,
271            },
272            time: TimeConfig {
273                allowed_lateness: "0s".into(),
274            },
275            window: WindowConfig {
276                kind: "tumbling".into(),
277                size: "60s".into(),
278                slide: None,
279                gap: None,
280            },
281            ops: OpsConfig {
282                count_by: None,
283                ..Default::default()
284            },
285            sink: SinkConfig {
286                kind: "parquet".into(),
287                out_dir: PathBuf::from("/tmp/out"),
288                bootstrap_servers: None,
289                topic: None,
290                acks: None,
291                compression: None,
292                max_bytes: None,
293                partition_field: None,
294                partition_format: None,
295            },
296        };
297        assert!(cfg.validate().is_err());
298    }
299}