Skip to main content

this/events/operators/
mod.rs

1//! Pipeline operators for declarative event flows
2//!
3//! Each operator implements the `PipelineOperator` trait and processes a
4//! `FlowContext` during pipeline execution. Operators are compiled from
5//! YAML `PipelineStep` configurations.
6//!
7//! # Operator types
8//!
9//! **Synchronous (1:1)** — preserve cardinality:
10//! - `resolve` — Resolve an entity by ID or by following a link
11//! - `filter` — Drop events that don't match a condition
12//! - `map` — Transform the payload via a Tera template
13//! - `deliver` — Send to one or more sinks
14//!
15//! **Stateful (1:N or N:1)** — change cardinality:
16//! - `fan_out` — Multiply event for each linked entity (see T2.3)
17//! - `batch` — Accumulate events and flush on window expiry (see T2.3)
18//! - `deduplicate` — Remove duplicates within a window (see T2.3)
19//! - `rate_limit` — Throttle via token bucket (see T2.3)
20
21pub mod batch;
22pub mod deduplicate;
23pub mod deliver;
24pub mod fan_out;
25pub mod filter;
26pub mod map;
27pub mod rate_limit;
28pub mod resolve;
29
30pub use batch::BatchOp;
31pub use deduplicate::DeduplicateOp;
32pub use deliver::DeliverOp;
33pub use fan_out::FanOutOp;
34pub use filter::FilterOp;
35pub use map::MapOp;
36pub use rate_limit::RateLimitOp;
37pub use resolve::ResolveOp;
38
39use crate::events::context::FlowContext;
40use anyhow::Result;
41use async_trait::async_trait;
42
43/// Result of executing a pipeline operator
44#[derive(Debug)]
45pub enum OpResult {
46    /// Continue to the next operator in the pipeline
47    Continue,
48
49    /// Drop this event — stop pipeline execution for this event
50    Drop,
51
52    /// Fan out into multiple contexts (one per element)
53    ///
54    /// Each resulting FlowContext will continue through the remaining
55    /// pipeline operators independently.
56    FanOut(Vec<FlowContext>),
57}
58
59/// Trait for pipeline operators
60///
61/// Each operator receives a mutable `FlowContext` and returns an `OpResult`
62/// indicating whether to continue, drop, or fan out.
63///
64/// # Implementors
65///
66/// - `ResolveOp` — resolves entities via LinkService/EntityFetcher
67/// - `FilterOp` — evaluates boolean conditions
68/// - `MapOp` — transforms payload via Tera templates
69/// - `DeliverOp` — delivers to sinks
70#[async_trait]
71pub trait PipelineOperator: Send + Sync + std::fmt::Debug {
72    /// Execute this operator on the given context
73    ///
74    /// May modify the context (e.g., adding variables) and returns
75    /// an `OpResult` indicating how to proceed.
76    async fn execute(&self, ctx: &mut FlowContext) -> Result<OpResult>;
77
78    /// Human-readable name for logging/debugging
79    fn name(&self) -> &str;
80}