pulse_core/
config.rs

1use serde::{Deserialize, Serialize};
2use std::path::PathBuf;
3
4#[derive(Debug, Deserialize, Serialize, Clone)]
5pub struct SourceConfig {
6    pub kind: String,           // "file"
7    pub path: PathBuf,          // path to file
8    pub time_field: String,     // e.g., "event_time" or "ts"
9    // Kafka-specific (used when kind=="kafka")
10    #[serde(default)]
11    pub bootstrap_servers: Option<String>,
12    #[serde(default)]
13    pub topic: Option<String>,
14    #[serde(default)]
15    pub group_id: Option<String>,
16    #[serde(default)]
17    pub auto_offset_reset: Option<String>, // "earliest" | "latest"
18    #[serde(default)]
19    pub commit_interval_ms: Option<u64>,
20}
21
22#[derive(Debug, Deserialize, Serialize, Clone)]
23pub struct TimeConfig {
24    pub allowed_lateness: String, // e.g., "10s"
25}
26
27#[derive(Debug, Deserialize, Serialize, Clone)]
28pub struct WindowConfig {
29    #[serde(rename = "type")]
30    pub kind: String, // tumbling|sliding|session
31    pub size: String, // e.g., "60s"
32    #[serde(default)]
33    pub slide: Option<String>,
34    #[serde(default)]
35    pub gap: Option<String>,
36}
37
38#[derive(Debug, Deserialize, Serialize, Clone, Default)]
39pub struct OpsConfig {
40    #[serde(default)]
41    pub count_by: Option<String>,
42}
43
44#[derive(Debug, Deserialize, Serialize, Clone)]
45pub struct SinkConfig {
46    pub kind: String,     // "parquet" | "file" | "kafka"
47    #[serde(default)]
48    pub out_dir: PathBuf, // for parquet or file path for file sink
49    // Kafka-specific (when kind=="kafka")
50    #[serde(default)]
51    pub bootstrap_servers: Option<String>,
52    #[serde(default)]
53    pub topic: Option<String>,
54    #[serde(default)]
55    pub acks: Option<String>, // "all" | "1" | "0"
56}
57
58#[derive(Debug, Deserialize, Serialize, Clone)]
59pub struct PipelineConfig {
60    pub source: SourceConfig,
61    pub time: TimeConfig,
62    pub window: WindowConfig,
63    pub ops: OpsConfig,
64    pub sink: SinkConfig,
65}
66
67impl PipelineConfig {
68    pub fn validate(&self) -> anyhow::Result<()> {
69        match self.source.kind.as_str() {
70            "file" => {
71                if self.source.path.as_os_str().is_empty() {
72                    anyhow::bail!("source.path must be set for file source");
73                }
74            }
75            "kafka" => {
76                if self.source.bootstrap_servers.as_deref().unwrap_or("").is_empty() {
77                    anyhow::bail!("source.bootstrap_servers must be set for kafka source");
78                }
79                if self.source.topic.as_deref().unwrap_or("").is_empty() {
80                    anyhow::bail!("source.topic must be set for kafka source");
81                }
82                if self.source.group_id.as_deref().unwrap_or("").is_empty() {
83                    anyhow::bail!("source.group_id must be set for kafka source");
84                }
85            }
86            other => anyhow::bail!("unsupported source kind: {}", other),
87        }
88        match self.sink.kind.as_str() {
89            "parquet" | "file" => {}
90            "kafka" => {
91                if self.sink.bootstrap_servers.as_deref().unwrap_or("").is_empty() {
92                    anyhow::bail!("sink.bootstrap_servers must be set for kafka sink");
93                }
94                if self.sink.topic.as_deref().unwrap_or("").is_empty() {
95                    anyhow::bail!("sink.topic must be set for kafka sink");
96                }
97            }
98            other => anyhow::bail!("unsupported sink kind: {}", other),
99        }
100        if self.ops.count_by.is_none() {
101            anyhow::bail!("ops.count_by must be set (e.g., word field)");
102        }
103        Ok(())
104    }
105}
106
107pub fn parse_duration_ms(s: &str) -> anyhow::Result<i64> {
108    // very small parser for values like "10s", "500ms", "2m"
109    let s = s.trim();
110    if let Some(num) = s.strip_suffix("ms") {
111        return Ok(num.parse::<i64>()?);
112    }
113    if let Some(num) = s.strip_suffix('s') {
114        return Ok(num.parse::<i64>()? * 1_000);
115    }
116    if let Some(num) = s.strip_suffix('m') {
117        return Ok(num.parse::<i64>()? * 60_000);
118    }
119    if let Some(num) = s.strip_suffix('h') {
120        return Ok(num.parse::<i64>()? * 3_600_000);
121    }
122    // default assume seconds
123    Ok(s.parse::<i64>()? * 1_000)
124}