Skip to main content

this/events/operators/
filter.rs

1//! Filter operator — evaluates a boolean condition against the FlowContext
2//!
3//! Drops events that don't match the condition. Supports simple expressions:
4//!
5//! - `field == "value"` — equality
6//! - `field != "value"` — inequality
7//! - `field exists` — field is present in context
8//! - `field not_exists` — field is absent from context
9//!
10//! Fields support dotted access (e.g., `owner.name == "Alice"`).
11//!
12//! ```yaml
13//! - filter:
14//!     condition: "source_id != target_id"
15//! ```
16
17use crate::config::events::FilterConfig;
18use crate::events::context::FlowContext;
19use crate::events::operators::{OpResult, PipelineOperator};
20use anyhow::{Result, anyhow};
21use async_trait::async_trait;
22use serde_json::Value;
23
24/// Supported comparison operators
25#[derive(Debug, Clone, PartialEq)]
26enum CompareOp {
27    /// `==` — equality
28    Equal,
29    /// `!=` — inequality
30    NotEqual,
31    /// `exists` — field is present
32    Exists,
33    /// `not_exists` — field is absent
34    NotExists,
35}
36
37/// Right-hand side of a comparison: literal value or variable reference
38#[derive(Debug, Clone)]
39enum FilterValue {
40    /// A literal value (quoted string, number, boolean, null)
41    Literal(String),
42    /// A variable reference (unquoted identifier, resolved from context)
43    Variable(String),
44}
45
46/// Parsed filter expression
47#[derive(Debug, Clone)]
48struct FilterExpr {
49    /// Left-hand side: variable name (supports dotted access)
50    field: String,
51    /// Comparison operator
52    op: CompareOp,
53    /// Right-hand side value (None for exists/not_exists)
54    value: Option<FilterValue>,
55}
56
57/// Compiled filter operator
58#[derive(Debug, Clone)]
59pub struct FilterOp {
60    /// The parsed expression to evaluate
61    expr: FilterExpr,
62    /// Original condition string (kept for Debug output)
63    _condition: String,
64}
65
66impl FilterOp {
67    /// Create a FilterOp from a FilterConfig
68    ///
69    /// Parses the condition string into a structured expression.
70    pub fn from_config(config: &FilterConfig) -> Result<Self> {
71        let expr = parse_condition(&config.condition)?;
72        Ok(Self {
73            expr,
74            _condition: config.condition.clone(),
75        })
76    }
77}
78
79#[async_trait]
80impl PipelineOperator for FilterOp {
81    async fn execute(&self, ctx: &mut FlowContext) -> Result<OpResult> {
82        let result = evaluate(&self.expr, ctx);
83        if result {
84            Ok(OpResult::Continue)
85        } else {
86            Ok(OpResult::Drop)
87        }
88    }
89
90    fn name(&self) -> &str {
91        "filter"
92    }
93}
94
95/// Parse a condition string into a FilterExpr
96fn parse_condition(condition: &str) -> Result<FilterExpr> {
97    let condition = condition.trim();
98
99    // Try `field not_exists` (must be before `!=` check)
100    if let Some(field) = condition.strip_suffix(" not_exists") {
101        return Ok(FilterExpr {
102            field: field.trim().to_string(),
103            op: CompareOp::NotExists,
104            value: None,
105        });
106    }
107
108    // Try `field exists`
109    if let Some(field) = condition.strip_suffix(" exists") {
110        return Ok(FilterExpr {
111            field: field.trim().to_string(),
112            op: CompareOp::Exists,
113            value: None,
114        });
115    }
116
117    // Try `field != value`
118    if let Some((left, right)) = condition.split_once(" != ") {
119        return Ok(FilterExpr {
120            field: left.trim().to_string(),
121            op: CompareOp::NotEqual,
122            value: Some(parse_rhs(right.trim())),
123        });
124    }
125
126    // Try `field == value`
127    if let Some((left, right)) = condition.split_once(" == ") {
128        return Ok(FilterExpr {
129            field: left.trim().to_string(),
130            op: CompareOp::Equal,
131            value: Some(parse_rhs(right.trim())),
132        });
133    }
134
135    Err(anyhow!(
136        "filter: cannot parse condition '{}'. Expected: 'field == value', 'field != value', 'field exists', or 'field not_exists'",
137        condition
138    ))
139}
140
141/// Remove surrounding quotes from a string value
142fn unquote(s: &str) -> String {
143    if (s.starts_with('"') && s.ends_with('"')) || (s.starts_with('\'') && s.ends_with('\'')) {
144        s[1..s.len() - 1].to_string()
145    } else {
146        s.to_string()
147    }
148}
149
150/// Check if a string represents a literal value (quoted, number, boolean, null)
151/// rather than a variable reference.
152fn is_literal_value(s: &str) -> bool {
153    // Quoted strings
154    if (s.starts_with('"') && s.ends_with('"')) || (s.starts_with('\'') && s.ends_with('\'')) {
155        return true;
156    }
157    // Numbers (int or float)
158    if s.parse::<f64>().is_ok() {
159        return true;
160    }
161    // Booleans
162    if s == "true" || s == "false" {
163        return true;
164    }
165    // Null
166    if s == "null" {
167        return true;
168    }
169    false
170}
171
172/// Parse the right-hand side of a comparison into a FilterValue
173fn parse_rhs(s: &str) -> FilterValue {
174    if is_literal_value(s) {
175        FilterValue::Literal(unquote(s))
176    } else {
177        FilterValue::Variable(s.to_string())
178    }
179}
180
181/// Resolve a FilterValue to a string for comparison
182fn resolve_filter_value(value: &FilterValue, ctx: &FlowContext) -> Option<String> {
183    match value {
184        FilterValue::Literal(s) => Some(s.clone()),
185        FilterValue::Variable(var_name) => ctx.get_var(var_name).map(value_to_string),
186    }
187}
188
189/// Convert a JSON Value to its string representation for comparison
190fn value_to_string(val: &Value) -> String {
191    match val {
192        Value::String(s) => s.clone(),
193        Value::Number(n) => n.to_string(),
194        Value::Bool(b) => b.to_string(),
195        Value::Null => "null".to_string(),
196        _ => val.to_string(),
197    }
198}
199
200/// Evaluate a filter expression against the context
201fn evaluate(expr: &FilterExpr, ctx: &FlowContext) -> bool {
202    let var = ctx.get_var(&expr.field);
203
204    match expr.op {
205        CompareOp::Exists => var.is_some(),
206        CompareOp::NotExists => var.is_none(),
207        CompareOp::Equal => match (var, &expr.value) {
208            (Some(val), Some(filter_val)) => {
209                match resolve_filter_value(filter_val, ctx) {
210                    Some(resolved) => value_matches(val, &resolved),
211                    None => false, // RHS variable not found → not equal
212                }
213            }
214            _ => false,
215        },
216        CompareOp::NotEqual => match (var, &expr.value) {
217            (Some(val), Some(filter_val)) => {
218                match resolve_filter_value(filter_val, ctx) {
219                    Some(resolved) => !value_matches(val, &resolved),
220                    None => true, // RHS variable not found → not equal
221                }
222            }
223            (None, _) => true, // Missing field != anything is true
224            _ => true,
225        },
226    }
227}
228
229/// Compare a JSON value against a string representation
230fn value_matches(val: &Value, expected: &str) -> bool {
231    match val {
232        Value::String(s) => s == expected,
233        Value::Number(n) => n.to_string() == expected,
234        Value::Bool(b) => b.to_string() == expected,
235        Value::Null => expected == "null",
236        _ => false,
237    }
238}
239
240#[cfg(test)]
241mod tests {
242    use super::*;
243    use crate::core::events::{EntityEvent, FrameworkEvent, LinkEvent};
244    use crate::core::service::LinkService;
245    use serde_json::json;
246    use std::collections::HashMap;
247    use std::sync::Arc;
248    use uuid::Uuid;
249
250    // ── Mock LinkService ─────────────────────────────────────────────
251
252    struct MockLinkService;
253
254    #[async_trait]
255    impl LinkService for MockLinkService {
256        async fn create(
257            &self,
258            _link: crate::core::link::LinkEntity,
259        ) -> Result<crate::core::link::LinkEntity> {
260            unimplemented!()
261        }
262        async fn get(&self, _id: &Uuid) -> Result<Option<crate::core::link::LinkEntity>> {
263            unimplemented!()
264        }
265        async fn list(&self) -> Result<Vec<crate::core::link::LinkEntity>> {
266            unimplemented!()
267        }
268        async fn find_by_source(
269            &self,
270            _source_id: &Uuid,
271            _link_type: Option<&str>,
272            _target_type: Option<&str>,
273        ) -> Result<Vec<crate::core::link::LinkEntity>> {
274            unimplemented!()
275        }
276        async fn find_by_target(
277            &self,
278            _target_id: &Uuid,
279            _link_type: Option<&str>,
280            _source_type: Option<&str>,
281        ) -> Result<Vec<crate::core::link::LinkEntity>> {
282            unimplemented!()
283        }
284        async fn update(
285            &self,
286            _id: &Uuid,
287            _link: crate::core::link::LinkEntity,
288        ) -> Result<crate::core::link::LinkEntity> {
289            unimplemented!()
290        }
291        async fn delete(&self, _id: &Uuid) -> Result<()> {
292            unimplemented!()
293        }
294        async fn delete_by_entity(&self, _entity_id: &Uuid) -> Result<()> {
295            unimplemented!()
296        }
297    }
298
299    fn mock_link_service() -> Arc<dyn LinkService> {
300        Arc::new(MockLinkService)
301    }
302
303    fn make_link_context(source_id: Uuid, target_id: Uuid) -> FlowContext {
304        let event = FrameworkEvent::Link(LinkEvent::Created {
305            link_type: "follows".to_string(),
306            link_id: Uuid::new_v4(),
307            source_id,
308            target_id,
309            metadata: None,
310        });
311        FlowContext::new(event, mock_link_service(), HashMap::new())
312    }
313
314    fn make_entity_context(entity_type: &str) -> FlowContext {
315        let event = FrameworkEvent::Entity(EntityEvent::Created {
316            entity_type: entity_type.to_string(),
317            entity_id: Uuid::new_v4(),
318            data: json!({"name": "test"}),
319        });
320        FlowContext::new(event, mock_link_service(), HashMap::new())
321    }
322
323    // ── Tests: equality ──────────────────────────────────────────────
324
325    #[tokio::test]
326    async fn test_filter_equal_pass() {
327        let mut ctx = make_entity_context("user");
328        let op = FilterOp::from_config(&FilterConfig {
329            condition: "entity_type == \"user\"".to_string(),
330        })
331        .unwrap();
332
333        let result = op.execute(&mut ctx).await.unwrap();
334        assert!(matches!(result, OpResult::Continue));
335    }
336
337    #[tokio::test]
338    async fn test_filter_equal_drop() {
339        let mut ctx = make_entity_context("user");
340        let op = FilterOp::from_config(&FilterConfig {
341            condition: "entity_type == \"post\"".to_string(),
342        })
343        .unwrap();
344
345        let result = op.execute(&mut ctx).await.unwrap();
346        assert!(matches!(result, OpResult::Drop));
347    }
348
349    // ── Tests: inequality ────────────────────────────────────────────
350
351    #[tokio::test]
352    async fn test_filter_not_equal_pass() {
353        let source_id = Uuid::new_v4();
354        let target_id = Uuid::new_v4();
355        let mut ctx = make_link_context(source_id, target_id);
356
357        let op = FilterOp::from_config(&FilterConfig {
358            condition: "source_id != target_id".to_string(),
359        })
360        .unwrap();
361
362        // source_id != target_id evaluates by comparing the string values
363        // Since they're different UUIDs, this should pass
364        let result = op.execute(&mut ctx).await.unwrap();
365        assert!(matches!(result, OpResult::Continue));
366    }
367
368    #[tokio::test]
369    async fn test_filter_not_equal_drop() {
370        let mut ctx = make_entity_context("user");
371        let op = FilterOp::from_config(&FilterConfig {
372            condition: "entity_type != \"user\"".to_string(),
373        })
374        .unwrap();
375
376        let result = op.execute(&mut ctx).await.unwrap();
377        assert!(matches!(result, OpResult::Drop));
378    }
379
380    // ── Tests: exists / not_exists ───────────────────────────────────
381
382    #[tokio::test]
383    async fn test_filter_exists_pass() {
384        let mut ctx = make_entity_context("user");
385        let op = FilterOp::from_config(&FilterConfig {
386            condition: "entity_type exists".to_string(),
387        })
388        .unwrap();
389
390        let result = op.execute(&mut ctx).await.unwrap();
391        assert!(matches!(result, OpResult::Continue));
392    }
393
394    #[tokio::test]
395    async fn test_filter_exists_drop() {
396        let mut ctx = make_entity_context("user");
397        let op = FilterOp::from_config(&FilterConfig {
398            condition: "nonexistent exists".to_string(),
399        })
400        .unwrap();
401
402        let result = op.execute(&mut ctx).await.unwrap();
403        assert!(matches!(result, OpResult::Drop));
404    }
405
406    #[tokio::test]
407    async fn test_filter_not_exists_pass() {
408        let mut ctx = make_entity_context("user");
409        let op = FilterOp::from_config(&FilterConfig {
410            condition: "nonexistent not_exists".to_string(),
411        })
412        .unwrap();
413
414        let result = op.execute(&mut ctx).await.unwrap();
415        assert!(matches!(result, OpResult::Continue));
416    }
417
418    #[tokio::test]
419    async fn test_filter_not_exists_drop() {
420        let mut ctx = make_entity_context("user");
421        let op = FilterOp::from_config(&FilterConfig {
422            condition: "entity_type not_exists".to_string(),
423        })
424        .unwrap();
425
426        let result = op.execute(&mut ctx).await.unwrap();
427        assert!(matches!(result, OpResult::Drop));
428    }
429
430    // ── Tests: dotted access ─────────────────────────────────────────
431
432    #[tokio::test]
433    async fn test_filter_dotted_access() {
434        let mut ctx = make_entity_context("user");
435        ctx.set_var("owner", json!({"name": "Alice", "role": "admin"}));
436
437        let op = FilterOp::from_config(&FilterConfig {
438            condition: "owner.role == \"admin\"".to_string(),
439        })
440        .unwrap();
441
442        let result = op.execute(&mut ctx).await.unwrap();
443        assert!(matches!(result, OpResult::Continue));
444    }
445
446    #[tokio::test]
447    async fn test_filter_dotted_access_missing() {
448        let mut ctx = make_entity_context("user");
449        ctx.set_var("owner", json!({"name": "Alice"}));
450
451        let op = FilterOp::from_config(&FilterConfig {
452            condition: "owner.role exists".to_string(),
453        })
454        .unwrap();
455
456        let result = op.execute(&mut ctx).await.unwrap();
457        assert!(matches!(result, OpResult::Drop));
458    }
459
460    // ── Tests: parse errors ──────────────────────────────────────────
461
462    #[test]
463    fn test_filter_parse_error() {
464        let result = FilterOp::from_config(&FilterConfig {
465            condition: "invalid condition without operator".to_string(),
466        });
467        assert!(result.is_err());
468    }
469
470    // ── Tests: value types ───────────────────────────────────────────
471
472    #[tokio::test]
473    async fn test_filter_number_comparison() {
474        let mut ctx = make_entity_context("user");
475        ctx.set_var("count", json!(42));
476
477        let op = FilterOp::from_config(&FilterConfig {
478            condition: "count == 42".to_string(),
479        })
480        .unwrap();
481
482        let result = op.execute(&mut ctx).await.unwrap();
483        assert!(matches!(result, OpResult::Continue));
484    }
485
486    #[tokio::test]
487    async fn test_filter_boolean_comparison() {
488        let mut ctx = make_entity_context("user");
489        ctx.set_var("active", json!(true));
490
491        let op = FilterOp::from_config(&FilterConfig {
492            condition: "active == true".to_string(),
493        })
494        .unwrap();
495
496        let result = op.execute(&mut ctx).await.unwrap();
497        assert!(matches!(result, OpResult::Continue));
498    }
499
500    // ── Tests: unquoted strings ──────────────────────────────────────
501
502    #[tokio::test]
503    async fn test_filter_single_quotes() {
504        let mut ctx = make_entity_context("user");
505        let op = FilterOp::from_config(&FilterConfig {
506            condition: "entity_type == 'user'".to_string(),
507        })
508        .unwrap();
509
510        let result = op.execute(&mut ctx).await.unwrap();
511        assert!(matches!(result, OpResult::Continue));
512    }
513
514    // ── Tests: variable-to-variable comparison ──────────────────────
515
516    #[tokio::test]
517    async fn test_filter_var_to_var_not_equal_same_uuid_drops() {
518        // source_id == target_id (same UUID) → source_id != target_id should Drop
519        let same_id = Uuid::new_v4();
520        let mut ctx = make_link_context(same_id, same_id);
521
522        let op = FilterOp::from_config(&FilterConfig {
523            condition: "source_id != target_id".to_string(),
524        })
525        .unwrap();
526
527        let result = op.execute(&mut ctx).await.unwrap();
528        assert!(
529            matches!(result, OpResult::Drop),
530            "self-link (source_id == target_id) should be dropped by != filter"
531        );
532    }
533
534    #[tokio::test]
535    async fn test_filter_var_to_var_equal_same_uuid_passes() {
536        // source_id == target_id (same UUID) → source_id == target_id should Continue
537        let same_id = Uuid::new_v4();
538        let mut ctx = make_link_context(same_id, same_id);
539
540        let op = FilterOp::from_config(&FilterConfig {
541            condition: "source_id == target_id".to_string(),
542        })
543        .unwrap();
544
545        let result = op.execute(&mut ctx).await.unwrap();
546        assert!(matches!(result, OpResult::Continue));
547    }
548
549    #[tokio::test]
550    async fn test_filter_var_to_var_equal_different_uuids_drops() {
551        // source_id != target_id (different UUIDs) → source_id == target_id should Drop
552        let mut ctx = make_link_context(Uuid::new_v4(), Uuid::new_v4());
553
554        let op = FilterOp::from_config(&FilterConfig {
555            condition: "source_id == target_id".to_string(),
556        })
557        .unwrap();
558
559        let result = op.execute(&mut ctx).await.unwrap();
560        assert!(matches!(result, OpResult::Drop));
561    }
562
563    #[tokio::test]
564    async fn test_filter_quoted_stays_literal() {
565        // Quoted "target_id" should NOT be resolved as a variable
566        let mut ctx = make_link_context(Uuid::new_v4(), Uuid::new_v4());
567
568        let op = FilterOp::from_config(&FilterConfig {
569            condition: "source_id != \"target_id\"".to_string(),
570        })
571        .unwrap();
572
573        // source_id is a UUID, "target_id" is the literal string → always not equal
574        let result = op.execute(&mut ctx).await.unwrap();
575        assert!(matches!(result, OpResult::Continue));
576    }
577
578    #[tokio::test]
579    async fn test_filter_unknown_var_fallback() {
580        // Unknown variable on RHS: source_id != unknown_var
581        // unknown_var doesn't exist → resolve returns None → treated as not equal
582        let mut ctx = make_link_context(Uuid::new_v4(), Uuid::new_v4());
583
584        let op = FilterOp::from_config(&FilterConfig {
585            condition: "source_id != unknown_var".to_string(),
586        })
587        .unwrap();
588
589        let result = op.execute(&mut ctx).await.unwrap();
590        assert!(
591            matches!(result, OpResult::Continue),
592            "comparison with unknown variable should be 'not equal'"
593        );
594    }
595}