1use crate::config::events::FilterConfig;
18use crate::events::context::FlowContext;
19use crate::events::operators::{OpResult, PipelineOperator};
20use anyhow::{Result, anyhow};
21use async_trait::async_trait;
22use serde_json::Value;
23
24#[derive(Debug, Clone, PartialEq)]
26enum CompareOp {
27 Equal,
29 NotEqual,
31 Exists,
33 NotExists,
35}
36
37#[derive(Debug, Clone)]
39enum FilterValue {
40 Literal(String),
42 Variable(String),
44}
45
46#[derive(Debug, Clone)]
48struct FilterExpr {
49 field: String,
51 op: CompareOp,
53 value: Option<FilterValue>,
55}
56
57#[derive(Debug, Clone)]
59pub struct FilterOp {
60 expr: FilterExpr,
62 _condition: String,
64}
65
66impl FilterOp {
67 pub fn from_config(config: &FilterConfig) -> Result<Self> {
71 let expr = parse_condition(&config.condition)?;
72 Ok(Self {
73 expr,
74 _condition: config.condition.clone(),
75 })
76 }
77}
78
79#[async_trait]
80impl PipelineOperator for FilterOp {
81 async fn execute(&self, ctx: &mut FlowContext) -> Result<OpResult> {
82 let result = evaluate(&self.expr, ctx);
83 if result {
84 Ok(OpResult::Continue)
85 } else {
86 Ok(OpResult::Drop)
87 }
88 }
89
90 fn name(&self) -> &str {
91 "filter"
92 }
93}
94
95fn parse_condition(condition: &str) -> Result<FilterExpr> {
97 let condition = condition.trim();
98
99 if let Some(field) = condition.strip_suffix(" not_exists") {
101 return Ok(FilterExpr {
102 field: field.trim().to_string(),
103 op: CompareOp::NotExists,
104 value: None,
105 });
106 }
107
108 if let Some(field) = condition.strip_suffix(" exists") {
110 return Ok(FilterExpr {
111 field: field.trim().to_string(),
112 op: CompareOp::Exists,
113 value: None,
114 });
115 }
116
117 if let Some((left, right)) = condition.split_once(" != ") {
119 return Ok(FilterExpr {
120 field: left.trim().to_string(),
121 op: CompareOp::NotEqual,
122 value: Some(parse_rhs(right.trim())),
123 });
124 }
125
126 if let Some((left, right)) = condition.split_once(" == ") {
128 return Ok(FilterExpr {
129 field: left.trim().to_string(),
130 op: CompareOp::Equal,
131 value: Some(parse_rhs(right.trim())),
132 });
133 }
134
135 Err(anyhow!(
136 "filter: cannot parse condition '{}'. Expected: 'field == value', 'field != value', 'field exists', or 'field not_exists'",
137 condition
138 ))
139}
140
141fn unquote(s: &str) -> String {
143 if (s.starts_with('"') && s.ends_with('"')) || (s.starts_with('\'') && s.ends_with('\'')) {
144 s[1..s.len() - 1].to_string()
145 } else {
146 s.to_string()
147 }
148}
149
150fn is_literal_value(s: &str) -> bool {
153 if (s.starts_with('"') && s.ends_with('"')) || (s.starts_with('\'') && s.ends_with('\'')) {
155 return true;
156 }
157 if s.parse::<f64>().is_ok() {
159 return true;
160 }
161 if s == "true" || s == "false" {
163 return true;
164 }
165 if s == "null" {
167 return true;
168 }
169 false
170}
171
172fn parse_rhs(s: &str) -> FilterValue {
174 if is_literal_value(s) {
175 FilterValue::Literal(unquote(s))
176 } else {
177 FilterValue::Variable(s.to_string())
178 }
179}
180
181fn resolve_filter_value(value: &FilterValue, ctx: &FlowContext) -> Option<String> {
183 match value {
184 FilterValue::Literal(s) => Some(s.clone()),
185 FilterValue::Variable(var_name) => ctx.get_var(var_name).map(value_to_string),
186 }
187}
188
189fn value_to_string(val: &Value) -> String {
191 match val {
192 Value::String(s) => s.clone(),
193 Value::Number(n) => n.to_string(),
194 Value::Bool(b) => b.to_string(),
195 Value::Null => "null".to_string(),
196 _ => val.to_string(),
197 }
198}
199
200fn evaluate(expr: &FilterExpr, ctx: &FlowContext) -> bool {
202 let var = ctx.get_var(&expr.field);
203
204 match expr.op {
205 CompareOp::Exists => var.is_some(),
206 CompareOp::NotExists => var.is_none(),
207 CompareOp::Equal => match (var, &expr.value) {
208 (Some(val), Some(filter_val)) => {
209 match resolve_filter_value(filter_val, ctx) {
210 Some(resolved) => value_matches(val, &resolved),
211 None => false, }
213 }
214 _ => false,
215 },
216 CompareOp::NotEqual => match (var, &expr.value) {
217 (Some(val), Some(filter_val)) => {
218 match resolve_filter_value(filter_val, ctx) {
219 Some(resolved) => !value_matches(val, &resolved),
220 None => true, }
222 }
223 (None, _) => true, _ => true,
225 },
226 }
227}
228
229fn value_matches(val: &Value, expected: &str) -> bool {
231 match val {
232 Value::String(s) => s == expected,
233 Value::Number(n) => n.to_string() == expected,
234 Value::Bool(b) => b.to_string() == expected,
235 Value::Null => expected == "null",
236 _ => false,
237 }
238}
239
240#[cfg(test)]
241mod tests {
242 use super::*;
243 use crate::core::events::{EntityEvent, FrameworkEvent, LinkEvent};
244 use crate::core::service::LinkService;
245 use serde_json::json;
246 use std::collections::HashMap;
247 use std::sync::Arc;
248 use uuid::Uuid;
249
250 struct MockLinkService;
253
254 #[async_trait]
255 impl LinkService for MockLinkService {
256 async fn create(
257 &self,
258 _link: crate::core::link::LinkEntity,
259 ) -> Result<crate::core::link::LinkEntity> {
260 unimplemented!()
261 }
262 async fn get(&self, _id: &Uuid) -> Result<Option<crate::core::link::LinkEntity>> {
263 unimplemented!()
264 }
265 async fn list(&self) -> Result<Vec<crate::core::link::LinkEntity>> {
266 unimplemented!()
267 }
268 async fn find_by_source(
269 &self,
270 _source_id: &Uuid,
271 _link_type: Option<&str>,
272 _target_type: Option<&str>,
273 ) -> Result<Vec<crate::core::link::LinkEntity>> {
274 unimplemented!()
275 }
276 async fn find_by_target(
277 &self,
278 _target_id: &Uuid,
279 _link_type: Option<&str>,
280 _source_type: Option<&str>,
281 ) -> Result<Vec<crate::core::link::LinkEntity>> {
282 unimplemented!()
283 }
284 async fn update(
285 &self,
286 _id: &Uuid,
287 _link: crate::core::link::LinkEntity,
288 ) -> Result<crate::core::link::LinkEntity> {
289 unimplemented!()
290 }
291 async fn delete(&self, _id: &Uuid) -> Result<()> {
292 unimplemented!()
293 }
294 async fn delete_by_entity(&self, _entity_id: &Uuid) -> Result<()> {
295 unimplemented!()
296 }
297 }
298
299 fn mock_link_service() -> Arc<dyn LinkService> {
300 Arc::new(MockLinkService)
301 }
302
303 fn make_link_context(source_id: Uuid, target_id: Uuid) -> FlowContext {
304 let event = FrameworkEvent::Link(LinkEvent::Created {
305 link_type: "follows".to_string(),
306 link_id: Uuid::new_v4(),
307 source_id,
308 target_id,
309 metadata: None,
310 });
311 FlowContext::new(event, mock_link_service(), HashMap::new())
312 }
313
314 fn make_entity_context(entity_type: &str) -> FlowContext {
315 let event = FrameworkEvent::Entity(EntityEvent::Created {
316 entity_type: entity_type.to_string(),
317 entity_id: Uuid::new_v4(),
318 data: json!({"name": "test"}),
319 });
320 FlowContext::new(event, mock_link_service(), HashMap::new())
321 }
322
323 #[tokio::test]
326 async fn test_filter_equal_pass() {
327 let mut ctx = make_entity_context("user");
328 let op = FilterOp::from_config(&FilterConfig {
329 condition: "entity_type == \"user\"".to_string(),
330 })
331 .unwrap();
332
333 let result = op.execute(&mut ctx).await.unwrap();
334 assert!(matches!(result, OpResult::Continue));
335 }
336
337 #[tokio::test]
338 async fn test_filter_equal_drop() {
339 let mut ctx = make_entity_context("user");
340 let op = FilterOp::from_config(&FilterConfig {
341 condition: "entity_type == \"post\"".to_string(),
342 })
343 .unwrap();
344
345 let result = op.execute(&mut ctx).await.unwrap();
346 assert!(matches!(result, OpResult::Drop));
347 }
348
349 #[tokio::test]
352 async fn test_filter_not_equal_pass() {
353 let source_id = Uuid::new_v4();
354 let target_id = Uuid::new_v4();
355 let mut ctx = make_link_context(source_id, target_id);
356
357 let op = FilterOp::from_config(&FilterConfig {
358 condition: "source_id != target_id".to_string(),
359 })
360 .unwrap();
361
362 let result = op.execute(&mut ctx).await.unwrap();
365 assert!(matches!(result, OpResult::Continue));
366 }
367
368 #[tokio::test]
369 async fn test_filter_not_equal_drop() {
370 let mut ctx = make_entity_context("user");
371 let op = FilterOp::from_config(&FilterConfig {
372 condition: "entity_type != \"user\"".to_string(),
373 })
374 .unwrap();
375
376 let result = op.execute(&mut ctx).await.unwrap();
377 assert!(matches!(result, OpResult::Drop));
378 }
379
380 #[tokio::test]
383 async fn test_filter_exists_pass() {
384 let mut ctx = make_entity_context("user");
385 let op = FilterOp::from_config(&FilterConfig {
386 condition: "entity_type exists".to_string(),
387 })
388 .unwrap();
389
390 let result = op.execute(&mut ctx).await.unwrap();
391 assert!(matches!(result, OpResult::Continue));
392 }
393
394 #[tokio::test]
395 async fn test_filter_exists_drop() {
396 let mut ctx = make_entity_context("user");
397 let op = FilterOp::from_config(&FilterConfig {
398 condition: "nonexistent exists".to_string(),
399 })
400 .unwrap();
401
402 let result = op.execute(&mut ctx).await.unwrap();
403 assert!(matches!(result, OpResult::Drop));
404 }
405
406 #[tokio::test]
407 async fn test_filter_not_exists_pass() {
408 let mut ctx = make_entity_context("user");
409 let op = FilterOp::from_config(&FilterConfig {
410 condition: "nonexistent not_exists".to_string(),
411 })
412 .unwrap();
413
414 let result = op.execute(&mut ctx).await.unwrap();
415 assert!(matches!(result, OpResult::Continue));
416 }
417
418 #[tokio::test]
419 async fn test_filter_not_exists_drop() {
420 let mut ctx = make_entity_context("user");
421 let op = FilterOp::from_config(&FilterConfig {
422 condition: "entity_type not_exists".to_string(),
423 })
424 .unwrap();
425
426 let result = op.execute(&mut ctx).await.unwrap();
427 assert!(matches!(result, OpResult::Drop));
428 }
429
430 #[tokio::test]
433 async fn test_filter_dotted_access() {
434 let mut ctx = make_entity_context("user");
435 ctx.set_var("owner", json!({"name": "Alice", "role": "admin"}));
436
437 let op = FilterOp::from_config(&FilterConfig {
438 condition: "owner.role == \"admin\"".to_string(),
439 })
440 .unwrap();
441
442 let result = op.execute(&mut ctx).await.unwrap();
443 assert!(matches!(result, OpResult::Continue));
444 }
445
446 #[tokio::test]
447 async fn test_filter_dotted_access_missing() {
448 let mut ctx = make_entity_context("user");
449 ctx.set_var("owner", json!({"name": "Alice"}));
450
451 let op = FilterOp::from_config(&FilterConfig {
452 condition: "owner.role exists".to_string(),
453 })
454 .unwrap();
455
456 let result = op.execute(&mut ctx).await.unwrap();
457 assert!(matches!(result, OpResult::Drop));
458 }
459
460 #[test]
463 fn test_filter_parse_error() {
464 let result = FilterOp::from_config(&FilterConfig {
465 condition: "invalid condition without operator".to_string(),
466 });
467 assert!(result.is_err());
468 }
469
470 #[tokio::test]
473 async fn test_filter_number_comparison() {
474 let mut ctx = make_entity_context("user");
475 ctx.set_var("count", json!(42));
476
477 let op = FilterOp::from_config(&FilterConfig {
478 condition: "count == 42".to_string(),
479 })
480 .unwrap();
481
482 let result = op.execute(&mut ctx).await.unwrap();
483 assert!(matches!(result, OpResult::Continue));
484 }
485
486 #[tokio::test]
487 async fn test_filter_boolean_comparison() {
488 let mut ctx = make_entity_context("user");
489 ctx.set_var("active", json!(true));
490
491 let op = FilterOp::from_config(&FilterConfig {
492 condition: "active == true".to_string(),
493 })
494 .unwrap();
495
496 let result = op.execute(&mut ctx).await.unwrap();
497 assert!(matches!(result, OpResult::Continue));
498 }
499
500 #[tokio::test]
503 async fn test_filter_single_quotes() {
504 let mut ctx = make_entity_context("user");
505 let op = FilterOp::from_config(&FilterConfig {
506 condition: "entity_type == 'user'".to_string(),
507 })
508 .unwrap();
509
510 let result = op.execute(&mut ctx).await.unwrap();
511 assert!(matches!(result, OpResult::Continue));
512 }
513
514 #[tokio::test]
517 async fn test_filter_var_to_var_not_equal_same_uuid_drops() {
518 let same_id = Uuid::new_v4();
520 let mut ctx = make_link_context(same_id, same_id);
521
522 let op = FilterOp::from_config(&FilterConfig {
523 condition: "source_id != target_id".to_string(),
524 })
525 .unwrap();
526
527 let result = op.execute(&mut ctx).await.unwrap();
528 assert!(
529 matches!(result, OpResult::Drop),
530 "self-link (source_id == target_id) should be dropped by != filter"
531 );
532 }
533
534 #[tokio::test]
535 async fn test_filter_var_to_var_equal_same_uuid_passes() {
536 let same_id = Uuid::new_v4();
538 let mut ctx = make_link_context(same_id, same_id);
539
540 let op = FilterOp::from_config(&FilterConfig {
541 condition: "source_id == target_id".to_string(),
542 })
543 .unwrap();
544
545 let result = op.execute(&mut ctx).await.unwrap();
546 assert!(matches!(result, OpResult::Continue));
547 }
548
549 #[tokio::test]
550 async fn test_filter_var_to_var_equal_different_uuids_drops() {
551 let mut ctx = make_link_context(Uuid::new_v4(), Uuid::new_v4());
553
554 let op = FilterOp::from_config(&FilterConfig {
555 condition: "source_id == target_id".to_string(),
556 })
557 .unwrap();
558
559 let result = op.execute(&mut ctx).await.unwrap();
560 assert!(matches!(result, OpResult::Drop));
561 }
562
563 #[tokio::test]
564 async fn test_filter_quoted_stays_literal() {
565 let mut ctx = make_link_context(Uuid::new_v4(), Uuid::new_v4());
567
568 let op = FilterOp::from_config(&FilterConfig {
569 condition: "source_id != \"target_id\"".to_string(),
570 })
571 .unwrap();
572
573 let result = op.execute(&mut ctx).await.unwrap();
575 assert!(matches!(result, OpResult::Continue));
576 }
577
578 #[tokio::test]
579 async fn test_filter_unknown_var_fallback() {
580 let mut ctx = make_link_context(Uuid::new_v4(), Uuid::new_v4());
583
584 let op = FilterOp::from_config(&FilterConfig {
585 condition: "source_id != unknown_var".to_string(),
586 })
587 .unwrap();
588
589 let result = op.execute(&mut ctx).await.unwrap();
590 assert!(
591 matches!(result, OpResult::Continue),
592 "comparison with unknown variable should be 'not equal'"
593 );
594 }
595}