Skip to main content

this/config/
events.rs

1//! Configuration types for the declarative event flow system
2//!
3//! These structs are deserialized from the `events` section of `this.yaml`.
4//! They define the event backend, declarative flows (trigger → pipeline → deliver),
5//! and consumer groups with seek positions.
6
7use serde::{Deserialize, Serialize};
8use std::collections::HashMap;
9
10/// Top-level events configuration
11///
12/// ```yaml
13/// events:
14///   backend:
15///     type: memory
16///   flows:
17///     - name: notify-new-follower
18///       trigger: { kind: link.created, link_type: follows }
19///       pipeline: [...]
20///   consumers:
21///     - name: mobile-feed
22///       seek: last_acknowledged
23/// ```
24#[derive(Debug, Clone, Serialize, Deserialize, Default)]
25pub struct EventsConfig {
26    /// Event backend configuration (memory, nats, kafka, redis)
27    #[serde(default)]
28    pub backend: BackendConfig,
29
30    /// Declarative event flows
31    #[serde(default)]
32    pub flows: Vec<FlowConfig>,
33
34    /// Consumer groups with seek positions
35    #[serde(default)]
36    pub consumers: Vec<ConsumerConfig>,
37}
38
39/// Event backend configuration
40///
41/// ```yaml
42/// backend:
43///   type: memory
44///   config:
45///     retention: 7d
46/// ```
47#[derive(Debug, Clone, Serialize, Deserialize)]
48pub struct BackendConfig {
49    /// Backend type: "memory" (default), "nats", "kafka", "redis"
50    #[serde(rename = "type", default = "default_backend_type")]
51    pub backend_type: String,
52
53    /// Backend-specific configuration (url, stream, retention, replicas, etc.)
54    #[serde(default)]
55    pub config: HashMap<String, serde_json::Value>,
56}
57
58fn default_backend_type() -> String {
59    "memory".to_string()
60}
61
62impl Default for BackendConfig {
63    fn default() -> Self {
64        Self {
65            backend_type: default_backend_type(),
66            config: HashMap::new(),
67        }
68    }
69}
70
71/// A declarative event flow definition
72///
73/// ```yaml
74/// flows:
75///   - name: notify-new-follower
76///     description: "Notify user when someone follows them"
77///     trigger:
78///       kind: link.created
79///       link_type: follows
80///     pipeline:
81///       - resolve:
82///           from: source_id
83///           as: follower
84///       - map:
85///           template:
86///             type: follow
87///             message: "{{ follower.name }} started following you"
88///       - deliver:
89///           sinks: [push-notification, in-app-notification]
90/// ```
91#[derive(Debug, Clone, Serialize, Deserialize)]
92pub struct FlowConfig {
93    /// Unique name for this flow
94    pub name: String,
95
96    /// Human-readable description
97    #[serde(default)]
98    pub description: Option<String>,
99
100    /// Event trigger (what events activate this flow)
101    pub trigger: TriggerConfig,
102
103    /// Pipeline of operators to apply
104    pub pipeline: Vec<PipelineStep>,
105}
106
107/// Event trigger configuration — determines which events activate a flow
108///
109/// ```yaml
110/// trigger:
111///   kind: link.created      # link.created, link.deleted, entity.created, entity.updated, entity.deleted
112///   link_type: follows       # optional: filter by link type
113///   entity_type: user        # optional: filter by entity type
114/// ```
115#[derive(Debug, Clone, Serialize, Deserialize)]
116pub struct TriggerConfig {
117    /// Event kind to match: "link.created", "link.deleted", "entity.created", "entity.updated", "entity.deleted"
118    pub kind: String,
119
120    /// Optional link type filter (only for link events)
121    #[serde(default)]
122    pub link_type: Option<String>,
123
124    /// Optional entity type filter (only for entity events)
125    #[serde(default)]
126    pub entity_type: Option<String>,
127}
128
129/// A single step in the pipeline — wraps a PipelineOp with its config
130///
131/// Each step is a single-key YAML map where the key names the operator:
132/// ```yaml
133/// - resolve:
134///     from: target_id
135///     as: owner
136/// - filter:
137///     condition: "source_id != owner.id"
138/// ```
139///
140/// Uses a custom Serialize/Deserialize to produce clean YAML (map keys instead of YAML tags).
141#[derive(Debug, Clone)]
142pub enum PipelineStep {
143    /// Resolve an entity by ID or by following a link
144    Resolve(ResolveConfig),
145    /// Filter events based on a condition (drop if false)
146    Filter(FilterConfig),
147    /// Fan out to multiple recipients via link resolution (1→N)
148    FanOut(FanOutConfig),
149    /// Batch events by key within a time window
150    Batch(BatchConfig),
151    /// Deduplicate events by key within a sliding window
152    Deduplicate(DeduplicateConfig),
153    /// Transform the payload via a Tera template
154    Map(MapConfig),
155    /// Rate limit the flow
156    RateLimit(RateLimitConfig),
157    /// Deliver to one or more sinks
158    Deliver(DeliverConfig),
159}
160
161const PIPELINE_STEP_VARIANTS: &[&str] = &[
162    "resolve",
163    "filter",
164    "fan_out",
165    "batch",
166    "deduplicate",
167    "map",
168    "rate_limit",
169    "deliver",
170];
171
172impl Serialize for PipelineStep {
173    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
174    where
175        S: serde::Serializer,
176    {
177        use serde::ser::SerializeMap;
178        let mut map = serializer.serialize_map(Some(1))?;
179        match self {
180            PipelineStep::Resolve(c) => map.serialize_entry("resolve", c)?,
181            PipelineStep::Filter(c) => map.serialize_entry("filter", c)?,
182            PipelineStep::FanOut(c) => map.serialize_entry("fan_out", c)?,
183            PipelineStep::Batch(c) => map.serialize_entry("batch", c)?,
184            PipelineStep::Deduplicate(c) => map.serialize_entry("deduplicate", c)?,
185            PipelineStep::Map(c) => map.serialize_entry("map", c)?,
186            PipelineStep::RateLimit(c) => map.serialize_entry("rate_limit", c)?,
187            PipelineStep::Deliver(c) => map.serialize_entry("deliver", c)?,
188        }
189        map.end()
190    }
191}
192
193impl<'de> Deserialize<'de> for PipelineStep {
194    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
195    where
196        D: serde::Deserializer<'de>,
197    {
198        use serde::de::{self, MapAccess, Visitor};
199
200        struct PipelineStepVisitor;
201
202        impl<'de> Visitor<'de> for PipelineStepVisitor {
203            type Value = PipelineStep;
204
205            fn expecting(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
206                write!(
207                    f,
208                    "a map with a single key naming the pipeline operator (resolve, filter, fan_out, batch, deduplicate, map, rate_limit, deliver)"
209                )
210            }
211
212            fn visit_map<M>(self, mut map: M) -> Result<Self::Value, M::Error>
213            where
214                M: MapAccess<'de>,
215            {
216                let key: String = map
217                    .next_key()?
218                    .ok_or_else(|| de::Error::custom("empty map, expected a pipeline operator"))?;
219                let step = match key.as_str() {
220                    "resolve" => PipelineStep::Resolve(map.next_value()?),
221                    "filter" => PipelineStep::Filter(map.next_value()?),
222                    "fan_out" => PipelineStep::FanOut(map.next_value()?),
223                    "batch" => PipelineStep::Batch(map.next_value()?),
224                    "deduplicate" => PipelineStep::Deduplicate(map.next_value()?),
225                    "map" => PipelineStep::Map(map.next_value()?),
226                    "rate_limit" => PipelineStep::RateLimit(map.next_value()?),
227                    "deliver" => PipelineStep::Deliver(map.next_value()?),
228                    _ => {
229                        return Err(de::Error::unknown_variant(&key, PIPELINE_STEP_VARIANTS));
230                    }
231                };
232                Ok(step)
233            }
234        }
235
236        deserializer.deserialize_map(PipelineStepVisitor)
237    }
238}
239
240/// Configuration for the `resolve` operator
241///
242/// Resolves an entity via its ID or by following a link through the LinkService.
243///
244/// ```yaml
245/// - resolve:
246///     from: target_id       # field containing the entity ID
247///     via: owns              # optional: link type to follow
248///     direction: reverse     # forward (default) or reverse
249///     as: owner              # variable name to store the resolved entity
250/// ```
251#[derive(Debug, Clone, Serialize, Deserialize)]
252pub struct ResolveConfig {
253    /// Field in the event/context containing the entity ID to resolve from
254    pub from: String,
255
256    /// Optional link type to follow (if absent, resolves the entity directly by ID)
257    #[serde(default)]
258    pub via: Option<String>,
259
260    /// Direction to follow the link: "forward" (default) or "reverse"
261    #[serde(default = "default_direction")]
262    pub direction: String,
263
264    /// Variable name to store the resolved entity in the FlowContext
265    #[serde(rename = "as")]
266    pub output_var: String,
267}
268
269fn default_direction() -> String {
270    "forward".to_string()
271}
272
273/// Configuration for the `filter` operator
274///
275/// Drops the event if the condition evaluates to false.
276///
277/// ```yaml
278/// - filter:
279///     condition: "source_id != owner.id"
280/// ```
281#[derive(Debug, Clone, Serialize, Deserialize)]
282pub struct FilterConfig {
283    /// Boolean expression to evaluate against the FlowContext variables
284    /// Supports: ==, !=, >, <, in, not_in, exists, not_exists
285    pub condition: String,
286}
287
288/// Configuration for the `fan_out` operator
289///
290/// Multiplies the event for each entity linked via the specified link type.
291///
292/// ```yaml
293/// - fan_out:
294///     from: source_id
295///     via: follows
296///     direction: reverse
297///     as: follower
298/// ```
299#[derive(Debug, Clone, Serialize, Deserialize)]
300pub struct FanOutConfig {
301    /// Field containing the entity ID to fan out from
302    pub from: String,
303
304    /// Link type to follow for fan-out
305    pub via: String,
306
307    /// Direction to follow the link: "forward" or "reverse"
308    #[serde(default = "default_direction")]
309    pub direction: String,
310
311    /// Variable name for each iterated entity
312    #[serde(rename = "as")]
313    pub output_var: String,
314}
315
316/// Configuration for the `batch` operator
317///
318/// Accumulates events by key within a time window, emitting a single batched event
319/// when the window expires.
320///
321/// ```yaml
322/// - batch:
323///     key: target_id
324///     window: 5m
325///     min_count: 1
326/// ```
327#[derive(Debug, Clone, Serialize, Deserialize)]
328pub struct BatchConfig {
329    /// Field to group events by
330    pub key: String,
331
332    /// Time window duration (e.g., "5m", "1h", "30s")
333    pub window: String,
334
335    /// Minimum number of events before emitting (default: 1)
336    #[serde(default = "default_min_count")]
337    pub min_count: u32,
338}
339
340fn default_min_count() -> u32 {
341    1
342}
343
344/// Configuration for the `deduplicate` operator
345///
346/// Eliminates duplicate events within a sliding time window.
347///
348/// ```yaml
349/// - deduplicate:
350///     key: source_id
351///     window: 1h
352/// ```
353#[derive(Debug, Clone, Serialize, Deserialize)]
354pub struct DeduplicateConfig {
355    /// Field to use as deduplication key
356    pub key: String,
357
358    /// Sliding window duration (e.g., "1h", "30m")
359    pub window: String,
360}
361
362/// Configuration for the `map` operator
363///
364/// Transforms the payload using a Tera template.
365///
366/// ```yaml
367/// - map:
368///     template:
369///       type: like
370///       recipient_id: "{{ owner.id }}"
371///       message: "{{ source.name }} liked your trace"
372/// ```
373#[derive(Debug, Clone, Serialize, Deserialize)]
374pub struct MapConfig {
375    /// Template to render — each value can contain Tera expressions
376    pub template: serde_json::Value,
377}
378
379/// Configuration for the `rate_limit` operator
380///
381/// Limits the throughput of the flow using a token bucket algorithm.
382///
383/// ```yaml
384/// - rate_limit:
385///     max: 100
386///     per: 1s
387///     strategy: drop
388/// ```
389#[derive(Debug, Clone, Serialize, Deserialize)]
390pub struct RateLimitConfig {
391    /// Maximum number of events allowed
392    pub max: u32,
393
394    /// Time period (e.g., "1s", "1m", "1h")
395    pub per: String,
396
397    /// Strategy when limit is exceeded: "drop" (default) or "queue"
398    #[serde(default = "default_rate_limit_strategy")]
399    pub strategy: String,
400}
401
402fn default_rate_limit_strategy() -> String {
403    "drop".to_string()
404}
405
406/// Configuration for the `deliver` operator
407///
408/// Sends the processed event to one or more sinks.
409///
410/// ```yaml
411/// # Single sink
412/// - deliver:
413///     sink: push-notification
414///
415/// # Multiple sinks
416/// - deliver:
417///     sinks: [push-notification, in-app-notification, websocket]
418/// ```
419#[derive(Debug, Clone, Serialize, Deserialize)]
420pub struct DeliverConfig {
421    /// Single sink name (mutually exclusive with `sinks`)
422    #[serde(default)]
423    pub sink: Option<String>,
424
425    /// Multiple sink names (mutually exclusive with `sink`)
426    #[serde(default)]
427    pub sinks: Option<Vec<String>>,
428}
429
430impl DeliverConfig {
431    /// Get all sink names this deliver step targets
432    ///
433    /// If both `sink` and `sinks` are present, they are merged (with a warning).
434    pub fn sink_names(&self) -> Vec<&str> {
435        let mut names = Vec::new();
436
437        // Include the singular `sink` if present
438        if let Some(sink) = &self.sink {
439            names.push(sink.as_str());
440        }
441
442        // Include all `sinks` if present
443        if let Some(sinks) = &self.sinks {
444            for s in sinks {
445                let name = s.as_str();
446                if !names.contains(&name) {
447                    names.push(name);
448                }
449            }
450        }
451
452        if self.sink.is_some() && self.sinks.is_some() {
453            tracing::warn!(
454                "deliver: both 'sink' and 'sinks' are defined — merging them. \
455                 Prefer using only 'sinks' for clarity."
456            );
457        }
458
459        names
460    }
461}
462
463/// Consumer group configuration
464///
465/// ```yaml
466/// consumers:
467///   - name: mobile-feed
468///     seek: last_acknowledged
469///   - name: web-dashboard
470///     seek: latest
471/// ```
472#[derive(Debug, Clone, Serialize, Deserialize)]
473pub struct ConsumerConfig {
474    /// Unique consumer group name
475    pub name: String,
476
477    /// Initial seek position
478    #[serde(default)]
479    pub seek: SeekMode,
480}
481
482/// Seek mode for consumers — determines where to start reading from
483#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
484#[serde(rename_all = "snake_case")]
485#[derive(Default)]
486pub enum SeekMode {
487    /// Start from the very beginning (replay all events)
488    Beginning,
489    /// Resume from the last acknowledged position
490    LastAcknowledged,
491    /// Start from now (only receive future events)
492    #[default]
493    Latest,
494}
495
496#[cfg(test)]
497mod tests {
498    use super::*;
499
500    #[test]
501    fn test_events_config_full_yaml() {
502        let yaml = r#"
503backend:
504  type: memory
505  config:
506    retention: 7d
507flows:
508  - name: notify-new-follower
509    description: "Notify user when someone follows them"
510    trigger:
511      kind: link.created
512      link_type: follows
513    pipeline:
514      - resolve:
515          from: source_id
516          as: follower
517      - resolve:
518          from: target_id
519          as: recipient
520      - map:
521          template:
522            type: follow
523            recipient_id: "{{ recipient.id }}"
524            message: "{{ follower.name }} started following you"
525      - deliver:
526          sinks: [push-notification, in-app-notification]
527  - name: notify-like
528    trigger:
529      kind: link.created
530      link_type: likes
531    pipeline:
532      - resolve:
533          from: target_id
534          via: owns
535          direction: reverse
536          as: owner
537      - filter:
538          condition: "source_id != owner.id"
539      - batch:
540          key: target_id
541          window: 5m
542      - deduplicate:
543          key: source_id
544          window: 1h
545      - map:
546          template:
547            type: like
548            recipient_id: "{{ owner.id }}"
549            message: "{{ batch.count }} people liked your trace"
550      - deliver:
551          sink: push-notification
552consumers:
553  - name: mobile-feed
554    seek: last_acknowledged
555  - name: web-dashboard
556    seek: latest
557"#;
558
559        let config: EventsConfig = serde_yaml::from_str(yaml).unwrap();
560
561        // Backend
562        assert_eq!(config.backend.backend_type, "memory");
563        assert_eq!(
564            config.backend.config.get("retention").unwrap(),
565            &serde_json::Value::String("7d".to_string())
566        );
567
568        // Flows
569        assert_eq!(config.flows.len(), 2);
570        assert_eq!(config.flows[0].name, "notify-new-follower");
571        assert_eq!(
572            config.flows[0].description.as_deref(),
573            Some("Notify user when someone follows them")
574        );
575        assert_eq!(config.flows[0].trigger.kind, "link.created");
576        assert_eq!(
577            config.flows[0].trigger.link_type.as_deref(),
578            Some("follows")
579        );
580        assert_eq!(config.flows[0].pipeline.len(), 4);
581
582        // Check pipeline operators
583        assert!(
584            matches!(&config.flows[0].pipeline[0], PipelineStep::Resolve(r) if r.from == "source_id")
585        );
586        assert!(
587            matches!(&config.flows[0].pipeline[1], PipelineStep::Resolve(r) if r.from == "target_id")
588        );
589        assert!(matches!(&config.flows[0].pipeline[2], PipelineStep::Map(_)));
590        assert!(
591            matches!(&config.flows[0].pipeline[3], PipelineStep::Deliver(d) if d.sink_names().len() == 2)
592        );
593
594        // Second flow with advanced operators
595        assert_eq!(config.flows[1].name, "notify-like");
596        assert_eq!(config.flows[1].pipeline.len(), 6);
597        assert!(
598            matches!(&config.flows[1].pipeline[0], PipelineStep::Resolve(r) if r.via.as_deref() == Some("owns"))
599        );
600        assert!(
601            matches!(&config.flows[1].pipeline[1], PipelineStep::Filter(f) if f.condition == "source_id != owner.id")
602        );
603        assert!(matches!(&config.flows[1].pipeline[2], PipelineStep::Batch(b) if b.window == "5m"));
604        assert!(
605            matches!(&config.flows[1].pipeline[3], PipelineStep::Deduplicate(d) if d.window == "1h")
606        );
607        assert!(
608            matches!(&config.flows[1].pipeline[5], PipelineStep::Deliver(d) if d.sink.as_deref() == Some("push-notification"))
609        );
610
611        // Consumers
612        assert_eq!(config.consumers.len(), 2);
613        assert_eq!(config.consumers[0].name, "mobile-feed");
614        assert_eq!(config.consumers[0].seek, SeekMode::LastAcknowledged);
615        assert_eq!(config.consumers[1].name, "web-dashboard");
616        assert_eq!(config.consumers[1].seek, SeekMode::Latest);
617    }
618
619    #[test]
620    fn test_events_config_minimal() {
621        let yaml = r#"
622flows: []
623"#;
624        let config: EventsConfig = serde_yaml::from_str(yaml).unwrap();
625        assert_eq!(config.backend.backend_type, "memory");
626        assert!(config.flows.is_empty());
627        assert!(config.consumers.is_empty());
628    }
629
630    #[test]
631    fn test_pipeline_step_serde_roundtrip() {
632        let steps = vec![
633            PipelineStep::Resolve(ResolveConfig {
634                from: "source_id".to_string(),
635                via: Some("follows".to_string()),
636                direction: "reverse".to_string(),
637                output_var: "follower".to_string(),
638            }),
639            PipelineStep::Filter(FilterConfig {
640                condition: "source_id != owner.id".to_string(),
641            }),
642            PipelineStep::FanOut(FanOutConfig {
643                from: "source_id".to_string(),
644                via: "follows".to_string(),
645                direction: "reverse".to_string(),
646                output_var: "followers".to_string(),
647            }),
648            PipelineStep::Batch(BatchConfig {
649                key: "target_id".to_string(),
650                window: "5m".to_string(),
651                min_count: 2,
652            }),
653            PipelineStep::Deduplicate(DeduplicateConfig {
654                key: "source_id".to_string(),
655                window: "1h".to_string(),
656            }),
657            PipelineStep::Map(MapConfig {
658                template: serde_json::json!({
659                    "type": "notification",
660                    "message": "{{ follower.name }} followed you"
661                }),
662            }),
663            PipelineStep::RateLimit(RateLimitConfig {
664                max: 100,
665                per: "1m".to_string(),
666                strategy: "drop".to_string(),
667            }),
668            PipelineStep::Deliver(DeliverConfig {
669                sink: None,
670                sinks: Some(vec![
671                    "push-notification".to_string(),
672                    "in-app-notification".to_string(),
673                ]),
674            }),
675        ];
676
677        for step in &steps {
678            let yaml = serde_yaml::to_string(step).unwrap();
679            let roundtrip: PipelineStep = serde_yaml::from_str(&yaml).unwrap();
680            // Verify the variant matches
681            assert_eq!(
682                std::mem::discriminant(step),
683                std::mem::discriminant(&roundtrip)
684            );
685        }
686    }
687
688    #[test]
689    fn test_seek_mode_variants() {
690        let yaml_beginning = "\"beginning\"";
691        let yaml_last = "\"last_acknowledged\"";
692        let yaml_latest = "\"latest\"";
693
694        assert_eq!(
695            serde_json::from_str::<SeekMode>(yaml_beginning).unwrap(),
696            SeekMode::Beginning
697        );
698        assert_eq!(
699            serde_json::from_str::<SeekMode>(yaml_last).unwrap(),
700            SeekMode::LastAcknowledged
701        );
702        assert_eq!(
703            serde_json::from_str::<SeekMode>(yaml_latest).unwrap(),
704            SeekMode::Latest
705        );
706    }
707
708    #[test]
709    fn test_deliver_config_single_sink() {
710        let config = DeliverConfig {
711            sink: Some("push".to_string()),
712            sinks: None,
713        };
714        assert_eq!(config.sink_names(), vec!["push"]);
715    }
716
717    #[test]
718    fn test_deliver_config_multiple_sinks() {
719        let config = DeliverConfig {
720            sink: None,
721            sinks: Some(vec!["push".to_string(), "in-app".to_string()]),
722        };
723        assert_eq!(config.sink_names(), vec!["push", "in-app"]);
724    }
725
726    #[test]
727    fn test_deliver_config_empty() {
728        let config = DeliverConfig {
729            sink: None,
730            sinks: None,
731        };
732        assert!(config.sink_names().is_empty());
733    }
734
735    #[test]
736    fn test_deliver_config_both_sink_and_sinks_merged() {
737        let config = DeliverConfig {
738            sink: Some("push".to_string()),
739            sinks: Some(vec!["in-app".to_string(), "websocket".to_string()]),
740        };
741        let names = config.sink_names();
742        assert_eq!(names.len(), 3);
743        assert!(names.contains(&"push"));
744        assert!(names.contains(&"in-app"));
745        assert!(names.contains(&"websocket"));
746    }
747
748    #[test]
749    fn test_deliver_config_both_with_duplicate_deduped() {
750        let config = DeliverConfig {
751            sink: Some("push".to_string()),
752            sinks: Some(vec!["push".to_string(), "in-app".to_string()]),
753        };
754        let names = config.sink_names();
755        // "push" should appear only once
756        assert_eq!(names.len(), 2);
757        assert_eq!(names, vec!["push", "in-app"]);
758    }
759
760    #[test]
761    fn test_resolve_direction_defaults() {
762        let yaml = r#"
763from: target_id
764as: owner
765"#;
766        let config: ResolveConfig = serde_yaml::from_str(yaml).unwrap();
767        assert_eq!(config.direction, "forward");
768        assert!(config.via.is_none());
769    }
770
771    #[test]
772    fn test_rate_limit_strategy_default() {
773        let yaml = r#"
774max: 50
775per: 1s
776"#;
777        let config: RateLimitConfig = serde_yaml::from_str(yaml).unwrap();
778        assert_eq!(config.strategy, "drop");
779    }
780
781    #[test]
782    fn test_batch_min_count_default() {
783        let yaml = r#"
784key: target_id
785window: 5m
786"#;
787        let config: BatchConfig = serde_yaml::from_str(yaml).unwrap();
788        assert_eq!(config.min_count, 1);
789    }
790
791    #[test]
792    fn test_flow_with_fan_out_pipeline() {
793        let yaml = r#"
794name: feed-update
795trigger:
796  kind: link.created
797  link_type: owns
798pipeline:
799  - resolve:
800      from: source_id
801      as: creator
802  - fan_out:
803      from: source_id
804      via: follows
805      direction: reverse
806      as: follower
807  - map:
808      template:
809        type: feed_update
810        recipient_id: "{{ follower.id }}"
811        message: "{{ creator.name }} posted a new trace"
812  - deliver:
813      sinks: [in-app-notification, websocket]
814"#;
815
816        let flow: FlowConfig = serde_yaml::from_str(yaml).unwrap();
817        assert_eq!(flow.name, "feed-update");
818        assert_eq!(flow.pipeline.len(), 4);
819        assert!(
820            matches!(&flow.pipeline[1], PipelineStep::FanOut(f) if f.via == "follows" && f.direction == "reverse")
821        );
822    }
823
824    #[test]
825    fn test_trigger_entity_event() {
826        let yaml = r#"
827kind: entity.created
828entity_type: user
829"#;
830        let trigger: TriggerConfig = serde_yaml::from_str(yaml).unwrap();
831        assert_eq!(trigger.kind, "entity.created");
832        assert_eq!(trigger.entity_type.as_deref(), Some("user"));
833        assert!(trigger.link_type.is_none());
834    }
835
836    #[test]
837    fn test_trigger_wildcard() {
838        let yaml = r#"
839kind: link.created
840"#;
841        let trigger: TriggerConfig = serde_yaml::from_str(yaml).unwrap();
842        assert_eq!(trigger.kind, "link.created");
843        assert!(trigger.link_type.is_none());
844        assert!(trigger.entity_type.is_none());
845    }
846}