pulse_core/
config.rs

1use serde::{Deserialize, Serialize};
2use std::path::PathBuf;
3
4#[derive(Debug, Deserialize, Serialize, Clone)]
5pub struct SourceConfig {
6    pub kind: String,           // "file"
7    pub path: PathBuf,          // path to file
8    pub time_field: String,     // e.g., "event_time" or "ts"
9    // Kafka-specific (used when kind=="kafka")
10    #[serde(default)]
11    pub bootstrap_servers: Option<String>,
12    #[serde(default)]
13    pub topic: Option<String>,
14    #[serde(default)]
15    pub group_id: Option<String>,
16    #[serde(default)]
17    pub auto_offset_reset: Option<String>, // "earliest" | "latest"
18    #[serde(default)]
19    pub commit_interval_ms: Option<u64>,
20}
21
22#[derive(Debug, Deserialize, Serialize, Clone)]
23pub struct TimeConfig {
24    pub allowed_lateness: String, // e.g., "10s"
25}
26
27#[derive(Debug, Deserialize, Serialize, Clone)]
28pub struct WindowConfig {
29    #[serde(rename = "type")]
30    pub kind: String, // tumbling|sliding|session
31    pub size: String, // e.g., "60s"
32    #[serde(default)]
33    pub slide: Option<String>,
34    #[serde(default)]
35    pub gap: Option<String>,
36}
37
38#[derive(Debug, Deserialize, Serialize, Clone, Default)]
39pub struct OpsConfig {
40    #[serde(default)]
41    pub count_by: Option<String>,
42}
43
44#[derive(Debug, Deserialize, Serialize, Clone)]
45pub struct SinkConfig {
46    pub kind: String,     // "parquet" | "file" | "kafka"
47    #[serde(default)]
48    pub out_dir: PathBuf, // for parquet or file path for file sink
49    // Kafka-specific (when kind=="kafka")
50    #[serde(default)]
51    pub bootstrap_servers: Option<String>,
52    #[serde(default)]
53    pub topic: Option<String>,
54    #[serde(default)]
55    pub acks: Option<String>, // "all" | "1" | "0"
56    // Parquet-specific options (when kind=="parquet")
57    #[serde(default)]
58    pub compression: Option<String>, // none|snappy|zstd
59    #[serde(default)]
60    pub max_bytes: Option<u64>,
61    #[serde(default)]
62    pub partition_field: Option<String>, // defaults to event_time if None
63    #[serde(default)]
64    pub partition_format: Option<String>, // e.g., %Y-%m-%d for date partitions
65}
66
67#[derive(Debug, Deserialize, Serialize, Clone)]
68pub struct PipelineConfig {
69    pub source: SourceConfig,
70    pub time: TimeConfig,
71    pub window: WindowConfig,
72    pub ops: OpsConfig,
73    pub sink: SinkConfig,
74}
75
76impl PipelineConfig {
77    pub fn validate(&self) -> anyhow::Result<()> {
78        match self.source.kind.as_str() {
79            "file" => {
80                if self.source.path.as_os_str().is_empty() {
81                    anyhow::bail!("source.path must be set for file source");
82                }
83            }
84            "kafka" => {
85                if self.source.bootstrap_servers.as_deref().unwrap_or("").is_empty() {
86                    anyhow::bail!("source.bootstrap_servers must be set for kafka source");
87                }
88                if self.source.topic.as_deref().unwrap_or("").is_empty() {
89                    anyhow::bail!("source.topic must be set for kafka source");
90                }
91                if self.source.group_id.as_deref().unwrap_or("").is_empty() {
92                    anyhow::bail!("source.group_id must be set for kafka source");
93                }
94            }
95            other => anyhow::bail!("unsupported source kind: {}", other),
96        }
97        match self.sink.kind.as_str() {
98            "parquet" | "file" => {}
99            "kafka" => {
100                if self.sink.bootstrap_servers.as_deref().unwrap_or("").is_empty() {
101                    anyhow::bail!("sink.bootstrap_servers must be set for kafka sink");
102                }
103                if self.sink.topic.as_deref().unwrap_or("").is_empty() {
104                    anyhow::bail!("sink.topic must be set for kafka sink");
105                }
106            }
107            other => anyhow::bail!("unsupported sink kind: {}", other),
108        }
109        if self.ops.count_by.is_none() {
110            anyhow::bail!("ops.count_by must be set (e.g., word field)");
111        }
112        Ok(())
113    }
114}
115
116pub fn parse_duration_ms(s: &str) -> anyhow::Result<i64> {
117    // very small parser for values like "10s", "500ms", "2m"
118    let s = s.trim();
119    if let Some(num) = s.strip_suffix("ms") {
120        return Ok(num.parse::<i64>()?);
121    }
122    if let Some(num) = s.strip_suffix('s') {
123        return Ok(num.parse::<i64>()? * 1_000);
124    }
125    if let Some(num) = s.strip_suffix('m') {
126        return Ok(num.parse::<i64>()? * 60_000);
127    }
128    if let Some(num) = s.strip_suffix('h') {
129        return Ok(num.parse::<i64>()? * 3_600_000);
130    }
131    // default assume seconds
132    Ok(s.parse::<i64>()? * 1_000)
133}
134
135#[cfg(test)]
136mod tests {
137    use super::*;
138
139    #[test]
140    fn parses_duration_suffixes() {
141        assert_eq!(parse_duration_ms("500ms").unwrap(), 500);
142        assert_eq!(parse_duration_ms("10s").unwrap(), 10_000);
143        assert_eq!(parse_duration_ms("2m").unwrap(), 120_000);
144        assert_eq!(parse_duration_ms("1h").unwrap(), 3_600_000);
145        // default seconds when no unit
146        assert_eq!(parse_duration_ms("42").unwrap(), 42_000);
147    }
148
149    #[test]
150    fn validate_file_pipeline_ok() {
151        let cfg = PipelineConfig {
152            source: SourceConfig {
153                kind: "file".into(),
154                path: PathBuf::from("/tmp/input.jsonl"),
155                time_field: "event_time".into(),
156                bootstrap_servers: None,
157                topic: None,
158                group_id: None,
159                auto_offset_reset: None,
160                commit_interval_ms: None,
161            },
162            time: TimeConfig { allowed_lateness: "0s".into() },
163            window: WindowConfig { kind: "tumbling".into(), size: "60s".into(), slide: None, gap: None },
164            ops: OpsConfig { count_by: Some("word".into()) },
165            sink: SinkConfig { kind: "parquet".into(), out_dir: PathBuf::from("/tmp/out"), bootstrap_servers: None, topic: None, acks: None, compression: None, max_bytes: None, partition_field: None, partition_format: None },
166        };
167        cfg.validate().unwrap();
168    }
169
170    #[test]
171    fn validate_kafka_pipeline_ok() {
172        let cfg = PipelineConfig {
173            source: SourceConfig {
174                kind: "kafka".into(),
175                path: PathBuf::from("unused"),
176                time_field: "event_time".into(),
177                bootstrap_servers: Some("localhost:9092".into()),
178                topic: Some("t".into()),
179                group_id: Some("g".into()),
180                auto_offset_reset: Some("earliest".into()),
181                commit_interval_ms: Some(1000),
182            },
183            time: TimeConfig { allowed_lateness: "5s".into() },
184            window: WindowConfig { kind: "tumbling".into(), size: "60s".into(), slide: None, gap: None },
185            ops: OpsConfig { count_by: Some("word".into()) },
186            sink: SinkConfig { kind: "kafka".into(), out_dir: PathBuf::from("./out"), bootstrap_servers: Some("localhost:9092".into()), topic: Some("t2".into()), acks: Some("all".into()), compression: None, max_bytes: None, partition_field: None, partition_format: None },
187        };
188        cfg.validate().unwrap();
189    }
190
191    #[test]
192    fn validate_missing_ops_count_by_errors() {
193        let cfg = PipelineConfig {
194            source: SourceConfig { kind: "file".into(), path: PathBuf::from("/tmp/in"), time_field: "ts".into(), bootstrap_servers: None, topic: None, group_id: None, auto_offset_reset: None, commit_interval_ms: None },
195            time: TimeConfig { allowed_lateness: "0s".into() },
196            window: WindowConfig { kind: "tumbling".into(), size: "60s".into(), slide: None, gap: None },
197            ops: OpsConfig { count_by: None },
198            sink: SinkConfig { kind: "parquet".into(), out_dir: PathBuf::from("/tmp/out"), bootstrap_servers: None, topic: None, acks: None, compression: None, max_bytes: None, partition_field: None, partition_format: None },
199        };
200        assert!(cfg.validate().is_err());
201    }
202}