1use serde::{Deserialize, Serialize};
2use std::path::PathBuf;
3
4#[derive(Debug, Deserialize, Serialize, Clone)]
5pub struct SourceConfig {
6 pub kind: String, pub path: PathBuf, pub time_field: String, #[serde(default)]
11 pub bootstrap_servers: Option<String>,
12 #[serde(default)]
13 pub topic: Option<String>,
14 #[serde(default)]
15 pub group_id: Option<String>,
16 #[serde(default)]
17 pub auto_offset_reset: Option<String>, #[serde(default)]
19 pub commit_interval_ms: Option<u64>,
20}
21
22#[derive(Debug, Deserialize, Serialize, Clone)]
23pub struct TimeConfig {
24 pub allowed_lateness: String, }
26
27#[derive(Debug, Deserialize, Serialize, Clone)]
28pub struct WindowConfig {
29 #[serde(rename = "type")]
30 pub kind: String, pub size: String, #[serde(default)]
33 pub slide: Option<String>,
34 #[serde(default)]
35 pub gap: Option<String>,
36}
37
38#[derive(Debug, Deserialize, Serialize, Clone, Default)]
39pub struct OpsConfig {
40 #[serde(default)]
41 pub count_by: Option<String>,
42}
43
44#[derive(Debug, Deserialize, Serialize, Clone)]
45pub struct SinkConfig {
46 pub kind: String, #[serde(default)]
48 pub out_dir: PathBuf, #[serde(default)]
51 pub bootstrap_servers: Option<String>,
52 #[serde(default)]
53 pub topic: Option<String>,
54 #[serde(default)]
55 pub acks: Option<String>, }
57
58#[derive(Debug, Deserialize, Serialize, Clone)]
59pub struct PipelineConfig {
60 pub source: SourceConfig,
61 pub time: TimeConfig,
62 pub window: WindowConfig,
63 pub ops: OpsConfig,
64 pub sink: SinkConfig,
65}
66
67impl PipelineConfig {
68 pub fn validate(&self) -> anyhow::Result<()> {
69 match self.source.kind.as_str() {
70 "file" => {
71 if self.source.path.as_os_str().is_empty() {
72 anyhow::bail!("source.path must be set for file source");
73 }
74 }
75 "kafka" => {
76 if self.source.bootstrap_servers.as_deref().unwrap_or("").is_empty() {
77 anyhow::bail!("source.bootstrap_servers must be set for kafka source");
78 }
79 if self.source.topic.as_deref().unwrap_or("").is_empty() {
80 anyhow::bail!("source.topic must be set for kafka source");
81 }
82 if self.source.group_id.as_deref().unwrap_or("").is_empty() {
83 anyhow::bail!("source.group_id must be set for kafka source");
84 }
85 }
86 other => anyhow::bail!("unsupported source kind: {}", other),
87 }
88 match self.sink.kind.as_str() {
89 "parquet" | "file" => {}
90 "kafka" => {
91 if self.sink.bootstrap_servers.as_deref().unwrap_or("").is_empty() {
92 anyhow::bail!("sink.bootstrap_servers must be set for kafka sink");
93 }
94 if self.sink.topic.as_deref().unwrap_or("").is_empty() {
95 anyhow::bail!("sink.topic must be set for kafka sink");
96 }
97 }
98 other => anyhow::bail!("unsupported sink kind: {}", other),
99 }
100 if self.ops.count_by.is_none() {
101 anyhow::bail!("ops.count_by must be set (e.g., word field)");
102 }
103 Ok(())
104 }
105}
106
107pub fn parse_duration_ms(s: &str) -> anyhow::Result<i64> {
108 let s = s.trim();
110 if let Some(num) = s.strip_suffix("ms") {
111 return Ok(num.parse::<i64>()?);
112 }
113 if let Some(num) = s.strip_suffix('s') {
114 return Ok(num.parse::<i64>()? * 1_000);
115 }
116 if let Some(num) = s.strip_suffix('m') {
117 return Ok(num.parse::<i64>()? * 60_000);
118 }
119 if let Some(num) = s.strip_suffix('h') {
120 return Ok(num.parse::<i64>()? * 3_600_000);
121 }
122 Ok(s.parse::<i64>()? * 1_000)
124}