1use crate::config::events::{FlowConfig, PipelineStep};
18use crate::events::matcher::EventMatcher;
19use crate::events::operators::*;
20use anyhow::{Context, Result};
21
22#[derive(Debug)]
24pub struct CompiledFlow {
25 pub name: String,
27
28 pub matcher: EventMatcher,
30
31 pub operators: Vec<Box<dyn PipelineOperator>>,
33}
34
35pub fn compile_flow(config: &FlowConfig) -> Result<CompiledFlow> {
47 let matcher = EventMatcher::compile(&config.trigger)
48 .with_context(|| format!("flow '{}': invalid trigger", config.name))?;
49
50 let mut operators: Vec<Box<dyn PipelineOperator>> = Vec::new();
51
52 for (i, step) in config.pipeline.iter().enumerate() {
53 let op: Box<dyn PipelineOperator> = compile_step(step)
54 .with_context(|| format!("flow '{}': step {} failed to compile", config.name, i))?;
55 operators.push(op);
56 }
57
58 Ok(CompiledFlow {
59 name: config.name.clone(),
60 matcher,
61 operators,
62 })
63}
64
65fn compile_step(step: &PipelineStep) -> Result<Box<dyn PipelineOperator>> {
67 match step {
68 PipelineStep::Resolve(config) => Ok(Box::new(ResolveOp::from_config(config))),
69 PipelineStep::Filter(config) => Ok(Box::new(FilterOp::from_config(config)?)),
70 PipelineStep::FanOut(config) => Ok(Box::new(FanOutOp::from_config(config))),
71 PipelineStep::Batch(config) => Ok(Box::new(BatchOp::from_config(config)?)),
72 PipelineStep::Deduplicate(config) => Ok(Box::new(DeduplicateOp::from_config(config)?)),
73 PipelineStep::Map(config) => Ok(Box::new(MapOp::from_config(config))),
74 PipelineStep::RateLimit(config) => Ok(Box::new(RateLimitOp::from_config(config)?)),
75 PipelineStep::Deliver(config) => Ok(Box::new(DeliverOp::from_config(config)?)),
76 }
77}
78
79pub fn compile_flows(configs: &[FlowConfig]) -> Result<Vec<CompiledFlow>> {
83 let mut compiled = Vec::new();
84 let mut errors = Vec::new();
85
86 for config in configs {
87 match compile_flow(config) {
88 Ok(flow) => compiled.push(flow),
89 Err(e) => {
90 tracing::error!(flow = %config.name, error = %e, "failed to compile flow");
91 errors.push(format!("{}: {}", config.name, e));
92 }
93 }
94 }
95
96 if !errors.is_empty() && compiled.is_empty() {
97 anyhow::bail!("all flows failed to compile: {}", errors.join("; "));
98 }
99
100 Ok(compiled)
101}
102
103#[cfg(test)]
104mod tests {
105 use super::*;
106 use crate::config::events::*;
107 use serde_json::json;
108
109 fn make_flow(name: &str, kind: &str, steps: Vec<PipelineStep>) -> FlowConfig {
110 FlowConfig {
111 name: name.to_string(),
112 description: None,
113 trigger: TriggerConfig {
114 kind: kind.to_string(),
115 link_type: None,
116 entity_type: None,
117 },
118 pipeline: steps,
119 }
120 }
121
122 #[test]
123 fn test_compile_empty_pipeline() {
124 let flow = make_flow("test", "link.created", vec![]);
125 let compiled = compile_flow(&flow).unwrap();
126 assert_eq!(compiled.name, "test");
127 assert!(compiled.operators.is_empty());
128 }
129
130 #[test]
131 fn test_compile_full_pipeline() {
132 let flow = make_flow(
133 "follow_notification",
134 "link.created",
135 vec![
136 PipelineStep::Resolve(ResolveConfig {
137 from: "source_id".to_string(),
138 via: None,
139 direction: "forward".to_string(),
140 output_var: "source".to_string(),
141 }),
142 PipelineStep::Filter(FilterConfig {
143 condition: "source_id != target_id".to_string(),
144 }),
145 PipelineStep::Map(MapConfig {
146 template: json!({
147 "title": "{{ source.name }} started following you"
148 }),
149 }),
150 PipelineStep::Deliver(DeliverConfig {
151 sink: Some("in_app".to_string()),
152 sinks: None,
153 }),
154 ],
155 );
156
157 let compiled = compile_flow(&flow).unwrap();
158 assert_eq!(compiled.name, "follow_notification");
159 assert_eq!(compiled.operators.len(), 4);
160 assert_eq!(compiled.operators[0].name(), "resolve");
161 assert_eq!(compiled.operators[1].name(), "filter");
162 assert_eq!(compiled.operators[2].name(), "map");
163 assert_eq!(compiled.operators[3].name(), "deliver");
164 }
165
166 #[test]
167 fn test_compile_with_stateful_operators() {
168 let flow = make_flow(
169 "like_batch",
170 "link.created",
171 vec![
172 PipelineStep::Deduplicate(DeduplicateConfig {
173 key: "source_id".to_string(),
174 window: "1h".to_string(),
175 }),
176 PipelineStep::Batch(BatchConfig {
177 key: "target_id".to_string(),
178 window: "5m".to_string(),
179 min_count: 1,
180 }),
181 PipelineStep::RateLimit(RateLimitConfig {
182 max: 100,
183 per: "1m".to_string(),
184 strategy: "drop".to_string(),
185 }),
186 PipelineStep::Map(MapConfig {
187 template: json!({"title": "batch"}),
188 }),
189 PipelineStep::Deliver(DeliverConfig {
190 sink: Some("push".to_string()),
191 sinks: None,
192 }),
193 ],
194 );
195
196 let compiled = compile_flow(&flow).unwrap();
197 assert_eq!(compiled.operators.len(), 5);
198 assert_eq!(compiled.operators[0].name(), "deduplicate");
199 assert_eq!(compiled.operators[1].name(), "batch");
200 assert_eq!(compiled.operators[2].name(), "rate_limit");
201 }
202
203 #[test]
204 fn test_compile_with_fan_out() {
205 let flow = make_flow(
206 "broadcast",
207 "entity.created",
208 vec![
209 PipelineStep::FanOut(FanOutConfig {
210 from: "entity_id".to_string(),
211 via: "follows".to_string(),
212 direction: "reverse".to_string(),
213 output_var: "follower".to_string(),
214 }),
215 PipelineStep::Map(MapConfig {
216 template: json!({"title": "new content"}),
217 }),
218 PipelineStep::Deliver(DeliverConfig {
219 sink: Some("in_app".to_string()),
220 sinks: None,
221 }),
222 ],
223 );
224
225 let compiled = compile_flow(&flow).unwrap();
226 assert_eq!(compiled.operators.len(), 3);
227 assert_eq!(compiled.operators[0].name(), "fan_out");
228 }
229
230 #[test]
231 fn test_compile_invalid_trigger() {
232 let flow = make_flow("bad", "invalid.kind", vec![]);
233 let result = compile_flow(&flow);
234 assert!(result.is_err());
235 assert!(result.unwrap_err().to_string().contains("invalid trigger"));
236 }
237
238 #[test]
239 fn test_compile_invalid_filter_condition() {
240 let flow = make_flow(
241 "bad_filter",
242 "link.created",
243 vec![PipelineStep::Filter(FilterConfig {
244 condition: "no operator here".to_string(),
245 })],
246 );
247
248 let result = compile_flow(&flow);
249 assert!(result.is_err());
250 }
251
252 #[test]
253 fn test_compile_deliver_no_sink() {
254 let flow = make_flow(
255 "bad_deliver",
256 "link.created",
257 vec![PipelineStep::Deliver(DeliverConfig {
258 sink: None,
259 sinks: None,
260 })],
261 );
262
263 let result = compile_flow(&flow);
264 assert!(result.is_err());
265 }
266
267 #[test]
268 fn test_compile_invalid_duration() {
269 let flow = make_flow(
270 "bad_batch",
271 "link.created",
272 vec![PipelineStep::Batch(BatchConfig {
273 key: "target_id".to_string(),
274 window: "invalid".to_string(),
275 min_count: 1,
276 })],
277 );
278
279 let result = compile_flow(&flow);
280 assert!(result.is_err());
281 }
282
283 #[test]
284 fn test_compile_flows_partial_failure() {
285 let good = make_flow("good", "link.created", vec![]);
286 let bad = make_flow("bad", "invalid.kind", vec![]);
287
288 let compiled = compile_flows(&[good, bad]).unwrap();
289 assert_eq!(compiled.len(), 1);
290 assert_eq!(compiled[0].name, "good");
291 }
292
293 #[test]
294 fn test_compile_flows_all_fail() {
295 let bad1 = make_flow("bad1", "invalid.kind", vec![]);
296 let bad2 = make_flow("bad2", "also.invalid", vec![]);
297
298 let result = compile_flows(&[bad1, bad2]);
299 assert!(result.is_err());
300 }
301}