Skip to main content

this/events/
compiler.rs

1//! Flow compiler — compiles FlowConfig YAML into executable CompiledFlow
2//!
3//! The compiler transforms declarative flow configurations into a list of
4//! executable pipeline operators. Each CompiledFlow contains:
5//! - An `EventMatcher` for deciding which events trigger the flow
6//! - A `Vec<Box<dyn PipelineOperator>>` for the pipeline steps
7//!
8//! # Usage
9//!
10//! ```ignore
11//! let config = FlowConfig { ... };
12//! let compiled = compile_flow(&config)?;
13//! // compiled.matcher.matches(&event) → true/false
14//! // for op in &compiled.operators { op.execute(&mut ctx).await? }
15//! ```
16
17use crate::config::events::{FlowConfig, PipelineStep};
18use crate::events::matcher::EventMatcher;
19use crate::events::operators::*;
20use anyhow::{Context, Result};
21
22/// A compiled flow ready for execution
23#[derive(Debug)]
24pub struct CompiledFlow {
25    /// Flow name (from config)
26    pub name: String,
27
28    /// Compiled event matcher (from trigger config)
29    pub matcher: EventMatcher,
30
31    /// Compiled pipeline operators (in execution order)
32    pub operators: Vec<Box<dyn PipelineOperator>>,
33}
34
35/// Compile a FlowConfig into a CompiledFlow
36///
37/// Validates the configuration and creates executable operator instances.
38///
39/// # Errors
40///
41/// Returns an error if:
42/// - The trigger has an unknown event kind
43/// - A filter condition cannot be parsed
44/// - A deliver step references no sinks
45/// - A duration string cannot be parsed
46pub fn compile_flow(config: &FlowConfig) -> Result<CompiledFlow> {
47    let matcher = EventMatcher::compile(&config.trigger)
48        .with_context(|| format!("flow '{}': invalid trigger", config.name))?;
49
50    let mut operators: Vec<Box<dyn PipelineOperator>> = Vec::new();
51
52    for (i, step) in config.pipeline.iter().enumerate() {
53        let op: Box<dyn PipelineOperator> = compile_step(step)
54            .with_context(|| format!("flow '{}': step {} failed to compile", config.name, i))?;
55        operators.push(op);
56    }
57
58    Ok(CompiledFlow {
59        name: config.name.clone(),
60        matcher,
61        operators,
62    })
63}
64
65/// Compile a single PipelineStep into a PipelineOperator
66fn compile_step(step: &PipelineStep) -> Result<Box<dyn PipelineOperator>> {
67    match step {
68        PipelineStep::Resolve(config) => Ok(Box::new(ResolveOp::from_config(config))),
69        PipelineStep::Filter(config) => Ok(Box::new(FilterOp::from_config(config)?)),
70        PipelineStep::FanOut(config) => Ok(Box::new(FanOutOp::from_config(config))),
71        PipelineStep::Batch(config) => Ok(Box::new(BatchOp::from_config(config)?)),
72        PipelineStep::Deduplicate(config) => Ok(Box::new(DeduplicateOp::from_config(config)?)),
73        PipelineStep::Map(config) => Ok(Box::new(MapOp::from_config(config))),
74        PipelineStep::RateLimit(config) => Ok(Box::new(RateLimitOp::from_config(config)?)),
75        PipelineStep::Deliver(config) => Ok(Box::new(DeliverOp::from_config(config)?)),
76    }
77}
78
79/// Compile multiple flows from a list of FlowConfigs
80///
81/// Returns all successfully compiled flows and logs errors for any that fail.
82pub fn compile_flows(configs: &[FlowConfig]) -> Result<Vec<CompiledFlow>> {
83    let mut compiled = Vec::new();
84    let mut errors = Vec::new();
85
86    for config in configs {
87        match compile_flow(config) {
88            Ok(flow) => compiled.push(flow),
89            Err(e) => {
90                tracing::error!(flow = %config.name, error = %e, "failed to compile flow");
91                errors.push(format!("{}: {}", config.name, e));
92            }
93        }
94    }
95
96    if !errors.is_empty() && compiled.is_empty() {
97        anyhow::bail!("all flows failed to compile: {}", errors.join("; "));
98    }
99
100    Ok(compiled)
101}
102
103#[cfg(test)]
104mod tests {
105    use super::*;
106    use crate::config::events::*;
107    use serde_json::json;
108
109    fn make_flow(name: &str, kind: &str, steps: Vec<PipelineStep>) -> FlowConfig {
110        FlowConfig {
111            name: name.to_string(),
112            description: None,
113            trigger: TriggerConfig {
114                kind: kind.to_string(),
115                link_type: None,
116                entity_type: None,
117            },
118            pipeline: steps,
119        }
120    }
121
122    #[test]
123    fn test_compile_empty_pipeline() {
124        let flow = make_flow("test", "link.created", vec![]);
125        let compiled = compile_flow(&flow).unwrap();
126        assert_eq!(compiled.name, "test");
127        assert!(compiled.operators.is_empty());
128    }
129
130    #[test]
131    fn test_compile_full_pipeline() {
132        let flow = make_flow(
133            "follow_notification",
134            "link.created",
135            vec![
136                PipelineStep::Resolve(ResolveConfig {
137                    from: "source_id".to_string(),
138                    via: None,
139                    direction: "forward".to_string(),
140                    output_var: "source".to_string(),
141                }),
142                PipelineStep::Filter(FilterConfig {
143                    condition: "source_id != target_id".to_string(),
144                }),
145                PipelineStep::Map(MapConfig {
146                    template: json!({
147                        "title": "{{ source.name }} started following you"
148                    }),
149                }),
150                PipelineStep::Deliver(DeliverConfig {
151                    sink: Some("in_app".to_string()),
152                    sinks: None,
153                }),
154            ],
155        );
156
157        let compiled = compile_flow(&flow).unwrap();
158        assert_eq!(compiled.name, "follow_notification");
159        assert_eq!(compiled.operators.len(), 4);
160        assert_eq!(compiled.operators[0].name(), "resolve");
161        assert_eq!(compiled.operators[1].name(), "filter");
162        assert_eq!(compiled.operators[2].name(), "map");
163        assert_eq!(compiled.operators[3].name(), "deliver");
164    }
165
166    #[test]
167    fn test_compile_with_stateful_operators() {
168        let flow = make_flow(
169            "like_batch",
170            "link.created",
171            vec![
172                PipelineStep::Deduplicate(DeduplicateConfig {
173                    key: "source_id".to_string(),
174                    window: "1h".to_string(),
175                }),
176                PipelineStep::Batch(BatchConfig {
177                    key: "target_id".to_string(),
178                    window: "5m".to_string(),
179                    min_count: 1,
180                }),
181                PipelineStep::RateLimit(RateLimitConfig {
182                    max: 100,
183                    per: "1m".to_string(),
184                    strategy: "drop".to_string(),
185                }),
186                PipelineStep::Map(MapConfig {
187                    template: json!({"title": "batch"}),
188                }),
189                PipelineStep::Deliver(DeliverConfig {
190                    sink: Some("push".to_string()),
191                    sinks: None,
192                }),
193            ],
194        );
195
196        let compiled = compile_flow(&flow).unwrap();
197        assert_eq!(compiled.operators.len(), 5);
198        assert_eq!(compiled.operators[0].name(), "deduplicate");
199        assert_eq!(compiled.operators[1].name(), "batch");
200        assert_eq!(compiled.operators[2].name(), "rate_limit");
201    }
202
203    #[test]
204    fn test_compile_with_fan_out() {
205        let flow = make_flow(
206            "broadcast",
207            "entity.created",
208            vec![
209                PipelineStep::FanOut(FanOutConfig {
210                    from: "entity_id".to_string(),
211                    via: "follows".to_string(),
212                    direction: "reverse".to_string(),
213                    output_var: "follower".to_string(),
214                }),
215                PipelineStep::Map(MapConfig {
216                    template: json!({"title": "new content"}),
217                }),
218                PipelineStep::Deliver(DeliverConfig {
219                    sink: Some("in_app".to_string()),
220                    sinks: None,
221                }),
222            ],
223        );
224
225        let compiled = compile_flow(&flow).unwrap();
226        assert_eq!(compiled.operators.len(), 3);
227        assert_eq!(compiled.operators[0].name(), "fan_out");
228    }
229
230    #[test]
231    fn test_compile_invalid_trigger() {
232        let flow = make_flow("bad", "invalid.kind", vec![]);
233        let result = compile_flow(&flow);
234        assert!(result.is_err());
235        assert!(result.unwrap_err().to_string().contains("invalid trigger"));
236    }
237
238    #[test]
239    fn test_compile_invalid_filter_condition() {
240        let flow = make_flow(
241            "bad_filter",
242            "link.created",
243            vec![PipelineStep::Filter(FilterConfig {
244                condition: "no operator here".to_string(),
245            })],
246        );
247
248        let result = compile_flow(&flow);
249        assert!(result.is_err());
250    }
251
252    #[test]
253    fn test_compile_deliver_no_sink() {
254        let flow = make_flow(
255            "bad_deliver",
256            "link.created",
257            vec![PipelineStep::Deliver(DeliverConfig {
258                sink: None,
259                sinks: None,
260            })],
261        );
262
263        let result = compile_flow(&flow);
264        assert!(result.is_err());
265    }
266
267    #[test]
268    fn test_compile_invalid_duration() {
269        let flow = make_flow(
270            "bad_batch",
271            "link.created",
272            vec![PipelineStep::Batch(BatchConfig {
273                key: "target_id".to_string(),
274                window: "invalid".to_string(),
275                min_count: 1,
276            })],
277        );
278
279        let result = compile_flow(&flow);
280        assert!(result.is_err());
281    }
282
283    #[test]
284    fn test_compile_flows_partial_failure() {
285        let good = make_flow("good", "link.created", vec![]);
286        let bad = make_flow("bad", "invalid.kind", vec![]);
287
288        let compiled = compile_flows(&[good, bad]).unwrap();
289        assert_eq!(compiled.len(), 1);
290        assert_eq!(compiled[0].name, "good");
291    }
292
293    #[test]
294    fn test_compile_flows_all_fail() {
295        let bad1 = make_flow("bad1", "invalid.kind", vec![]);
296        let bad2 = make_flow("bad2", "also.invalid", vec![]);
297
298        let result = compile_flows(&[bad1, bad2]);
299        assert!(result.is_err());
300    }
301}