1use serde::{Deserialize, Serialize};
8use std::collections::HashMap;
9
10#[derive(Debug, Clone, Serialize, Deserialize, Default)]
25pub struct EventsConfig {
26 #[serde(default)]
28 pub backend: BackendConfig,
29
30 #[serde(default)]
32 pub flows: Vec<FlowConfig>,
33
34 #[serde(default)]
36 pub consumers: Vec<ConsumerConfig>,
37}
38
39#[derive(Debug, Clone, Serialize, Deserialize)]
48pub struct BackendConfig {
49 #[serde(rename = "type", default = "default_backend_type")]
51 pub backend_type: String,
52
53 #[serde(default)]
55 pub config: HashMap<String, serde_json::Value>,
56}
57
58fn default_backend_type() -> String {
59 "memory".to_string()
60}
61
62impl Default for BackendConfig {
63 fn default() -> Self {
64 Self {
65 backend_type: default_backend_type(),
66 config: HashMap::new(),
67 }
68 }
69}
70
71#[derive(Debug, Clone, Serialize, Deserialize)]
92pub struct FlowConfig {
93 pub name: String,
95
96 #[serde(default)]
98 pub description: Option<String>,
99
100 pub trigger: TriggerConfig,
102
103 pub pipeline: Vec<PipelineStep>,
105}
106
107#[derive(Debug, Clone, Serialize, Deserialize)]
116pub struct TriggerConfig {
117 pub kind: String,
119
120 #[serde(default)]
122 pub link_type: Option<String>,
123
124 #[serde(default)]
126 pub entity_type: Option<String>,
127}
128
129#[derive(Debug, Clone)]
142pub enum PipelineStep {
143 Resolve(ResolveConfig),
145 Filter(FilterConfig),
147 FanOut(FanOutConfig),
149 Batch(BatchConfig),
151 Deduplicate(DeduplicateConfig),
153 Map(MapConfig),
155 RateLimit(RateLimitConfig),
157 Deliver(DeliverConfig),
159}
160
161const PIPELINE_STEP_VARIANTS: &[&str] = &[
162 "resolve",
163 "filter",
164 "fan_out",
165 "batch",
166 "deduplicate",
167 "map",
168 "rate_limit",
169 "deliver",
170];
171
172impl Serialize for PipelineStep {
173 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
174 where
175 S: serde::Serializer,
176 {
177 use serde::ser::SerializeMap;
178 let mut map = serializer.serialize_map(Some(1))?;
179 match self {
180 PipelineStep::Resolve(c) => map.serialize_entry("resolve", c)?,
181 PipelineStep::Filter(c) => map.serialize_entry("filter", c)?,
182 PipelineStep::FanOut(c) => map.serialize_entry("fan_out", c)?,
183 PipelineStep::Batch(c) => map.serialize_entry("batch", c)?,
184 PipelineStep::Deduplicate(c) => map.serialize_entry("deduplicate", c)?,
185 PipelineStep::Map(c) => map.serialize_entry("map", c)?,
186 PipelineStep::RateLimit(c) => map.serialize_entry("rate_limit", c)?,
187 PipelineStep::Deliver(c) => map.serialize_entry("deliver", c)?,
188 }
189 map.end()
190 }
191}
192
193impl<'de> Deserialize<'de> for PipelineStep {
194 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
195 where
196 D: serde::Deserializer<'de>,
197 {
198 use serde::de::{self, MapAccess, Visitor};
199
200 struct PipelineStepVisitor;
201
202 impl<'de> Visitor<'de> for PipelineStepVisitor {
203 type Value = PipelineStep;
204
205 fn expecting(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
206 write!(
207 f,
208 "a map with a single key naming the pipeline operator (resolve, filter, fan_out, batch, deduplicate, map, rate_limit, deliver)"
209 )
210 }
211
212 fn visit_map<M>(self, mut map: M) -> Result<Self::Value, M::Error>
213 where
214 M: MapAccess<'de>,
215 {
216 let key: String = map
217 .next_key()?
218 .ok_or_else(|| de::Error::custom("empty map, expected a pipeline operator"))?;
219 let step = match key.as_str() {
220 "resolve" => PipelineStep::Resolve(map.next_value()?),
221 "filter" => PipelineStep::Filter(map.next_value()?),
222 "fan_out" => PipelineStep::FanOut(map.next_value()?),
223 "batch" => PipelineStep::Batch(map.next_value()?),
224 "deduplicate" => PipelineStep::Deduplicate(map.next_value()?),
225 "map" => PipelineStep::Map(map.next_value()?),
226 "rate_limit" => PipelineStep::RateLimit(map.next_value()?),
227 "deliver" => PipelineStep::Deliver(map.next_value()?),
228 _ => {
229 return Err(de::Error::unknown_variant(&key, PIPELINE_STEP_VARIANTS));
230 }
231 };
232 Ok(step)
233 }
234 }
235
236 deserializer.deserialize_map(PipelineStepVisitor)
237 }
238}
239
240#[derive(Debug, Clone, Serialize, Deserialize)]
252pub struct ResolveConfig {
253 pub from: String,
255
256 #[serde(default)]
258 pub via: Option<String>,
259
260 #[serde(default = "default_direction")]
262 pub direction: String,
263
264 #[serde(rename = "as")]
266 pub output_var: String,
267}
268
269fn default_direction() -> String {
270 "forward".to_string()
271}
272
273#[derive(Debug, Clone, Serialize, Deserialize)]
282pub struct FilterConfig {
283 pub condition: String,
286}
287
288#[derive(Debug, Clone, Serialize, Deserialize)]
300pub struct FanOutConfig {
301 pub from: String,
303
304 pub via: String,
306
307 #[serde(default = "default_direction")]
309 pub direction: String,
310
311 #[serde(rename = "as")]
313 pub output_var: String,
314}
315
316#[derive(Debug, Clone, Serialize, Deserialize)]
328pub struct BatchConfig {
329 pub key: String,
331
332 pub window: String,
334
335 #[serde(default = "default_min_count")]
337 pub min_count: u32,
338}
339
340fn default_min_count() -> u32 {
341 1
342}
343
344#[derive(Debug, Clone, Serialize, Deserialize)]
354pub struct DeduplicateConfig {
355 pub key: String,
357
358 pub window: String,
360}
361
362#[derive(Debug, Clone, Serialize, Deserialize)]
374pub struct MapConfig {
375 pub template: serde_json::Value,
377}
378
379#[derive(Debug, Clone, Serialize, Deserialize)]
390pub struct RateLimitConfig {
391 pub max: u32,
393
394 pub per: String,
396
397 #[serde(default = "default_rate_limit_strategy")]
399 pub strategy: String,
400}
401
402fn default_rate_limit_strategy() -> String {
403 "drop".to_string()
404}
405
406#[derive(Debug, Clone, Serialize, Deserialize)]
420pub struct DeliverConfig {
421 #[serde(default)]
423 pub sink: Option<String>,
424
425 #[serde(default)]
427 pub sinks: Option<Vec<String>>,
428}
429
430impl DeliverConfig {
431 pub fn sink_names(&self) -> Vec<&str> {
435 let mut names = Vec::new();
436
437 if let Some(sink) = &self.sink {
439 names.push(sink.as_str());
440 }
441
442 if let Some(sinks) = &self.sinks {
444 for s in sinks {
445 let name = s.as_str();
446 if !names.contains(&name) {
447 names.push(name);
448 }
449 }
450 }
451
452 if self.sink.is_some() && self.sinks.is_some() {
453 tracing::warn!(
454 "deliver: both 'sink' and 'sinks' are defined — merging them. \
455 Prefer using only 'sinks' for clarity."
456 );
457 }
458
459 names
460 }
461}
462
463#[derive(Debug, Clone, Serialize, Deserialize)]
473pub struct ConsumerConfig {
474 pub name: String,
476
477 #[serde(default)]
479 pub seek: SeekMode,
480}
481
482#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
484#[serde(rename_all = "snake_case")]
485#[derive(Default)]
486pub enum SeekMode {
487 Beginning,
489 LastAcknowledged,
491 #[default]
493 Latest,
494}
495
496#[cfg(test)]
497mod tests {
498 use super::*;
499
500 #[test]
501 fn test_events_config_full_yaml() {
502 let yaml = r#"
503backend:
504 type: memory
505 config:
506 retention: 7d
507flows:
508 - name: notify-new-follower
509 description: "Notify user when someone follows them"
510 trigger:
511 kind: link.created
512 link_type: follows
513 pipeline:
514 - resolve:
515 from: source_id
516 as: follower
517 - resolve:
518 from: target_id
519 as: recipient
520 - map:
521 template:
522 type: follow
523 recipient_id: "{{ recipient.id }}"
524 message: "{{ follower.name }} started following you"
525 - deliver:
526 sinks: [push-notification, in-app-notification]
527 - name: notify-like
528 trigger:
529 kind: link.created
530 link_type: likes
531 pipeline:
532 - resolve:
533 from: target_id
534 via: owns
535 direction: reverse
536 as: owner
537 - filter:
538 condition: "source_id != owner.id"
539 - batch:
540 key: target_id
541 window: 5m
542 - deduplicate:
543 key: source_id
544 window: 1h
545 - map:
546 template:
547 type: like
548 recipient_id: "{{ owner.id }}"
549 message: "{{ batch.count }} people liked your trace"
550 - deliver:
551 sink: push-notification
552consumers:
553 - name: mobile-feed
554 seek: last_acknowledged
555 - name: web-dashboard
556 seek: latest
557"#;
558
559 let config: EventsConfig = serde_yaml::from_str(yaml).unwrap();
560
561 assert_eq!(config.backend.backend_type, "memory");
563 assert_eq!(
564 config.backend.config.get("retention").unwrap(),
565 &serde_json::Value::String("7d".to_string())
566 );
567
568 assert_eq!(config.flows.len(), 2);
570 assert_eq!(config.flows[0].name, "notify-new-follower");
571 assert_eq!(
572 config.flows[0].description.as_deref(),
573 Some("Notify user when someone follows them")
574 );
575 assert_eq!(config.flows[0].trigger.kind, "link.created");
576 assert_eq!(
577 config.flows[0].trigger.link_type.as_deref(),
578 Some("follows")
579 );
580 assert_eq!(config.flows[0].pipeline.len(), 4);
581
582 assert!(
584 matches!(&config.flows[0].pipeline[0], PipelineStep::Resolve(r) if r.from == "source_id")
585 );
586 assert!(
587 matches!(&config.flows[0].pipeline[1], PipelineStep::Resolve(r) if r.from == "target_id")
588 );
589 assert!(matches!(&config.flows[0].pipeline[2], PipelineStep::Map(_)));
590 assert!(
591 matches!(&config.flows[0].pipeline[3], PipelineStep::Deliver(d) if d.sink_names().len() == 2)
592 );
593
594 assert_eq!(config.flows[1].name, "notify-like");
596 assert_eq!(config.flows[1].pipeline.len(), 6);
597 assert!(
598 matches!(&config.flows[1].pipeline[0], PipelineStep::Resolve(r) if r.via.as_deref() == Some("owns"))
599 );
600 assert!(
601 matches!(&config.flows[1].pipeline[1], PipelineStep::Filter(f) if f.condition == "source_id != owner.id")
602 );
603 assert!(matches!(&config.flows[1].pipeline[2], PipelineStep::Batch(b) if b.window == "5m"));
604 assert!(
605 matches!(&config.flows[1].pipeline[3], PipelineStep::Deduplicate(d) if d.window == "1h")
606 );
607 assert!(
608 matches!(&config.flows[1].pipeline[5], PipelineStep::Deliver(d) if d.sink.as_deref() == Some("push-notification"))
609 );
610
611 assert_eq!(config.consumers.len(), 2);
613 assert_eq!(config.consumers[0].name, "mobile-feed");
614 assert_eq!(config.consumers[0].seek, SeekMode::LastAcknowledged);
615 assert_eq!(config.consumers[1].name, "web-dashboard");
616 assert_eq!(config.consumers[1].seek, SeekMode::Latest);
617 }
618
619 #[test]
620 fn test_events_config_minimal() {
621 let yaml = r#"
622flows: []
623"#;
624 let config: EventsConfig = serde_yaml::from_str(yaml).unwrap();
625 assert_eq!(config.backend.backend_type, "memory");
626 assert!(config.flows.is_empty());
627 assert!(config.consumers.is_empty());
628 }
629
630 #[test]
631 fn test_pipeline_step_serde_roundtrip() {
632 let steps = vec![
633 PipelineStep::Resolve(ResolveConfig {
634 from: "source_id".to_string(),
635 via: Some("follows".to_string()),
636 direction: "reverse".to_string(),
637 output_var: "follower".to_string(),
638 }),
639 PipelineStep::Filter(FilterConfig {
640 condition: "source_id != owner.id".to_string(),
641 }),
642 PipelineStep::FanOut(FanOutConfig {
643 from: "source_id".to_string(),
644 via: "follows".to_string(),
645 direction: "reverse".to_string(),
646 output_var: "followers".to_string(),
647 }),
648 PipelineStep::Batch(BatchConfig {
649 key: "target_id".to_string(),
650 window: "5m".to_string(),
651 min_count: 2,
652 }),
653 PipelineStep::Deduplicate(DeduplicateConfig {
654 key: "source_id".to_string(),
655 window: "1h".to_string(),
656 }),
657 PipelineStep::Map(MapConfig {
658 template: serde_json::json!({
659 "type": "notification",
660 "message": "{{ follower.name }} followed you"
661 }),
662 }),
663 PipelineStep::RateLimit(RateLimitConfig {
664 max: 100,
665 per: "1m".to_string(),
666 strategy: "drop".to_string(),
667 }),
668 PipelineStep::Deliver(DeliverConfig {
669 sink: None,
670 sinks: Some(vec![
671 "push-notification".to_string(),
672 "in-app-notification".to_string(),
673 ]),
674 }),
675 ];
676
677 for step in &steps {
678 let yaml = serde_yaml::to_string(step).unwrap();
679 let roundtrip: PipelineStep = serde_yaml::from_str(&yaml).unwrap();
680 assert_eq!(
682 std::mem::discriminant(step),
683 std::mem::discriminant(&roundtrip)
684 );
685 }
686 }
687
688 #[test]
689 fn test_seek_mode_variants() {
690 let yaml_beginning = "\"beginning\"";
691 let yaml_last = "\"last_acknowledged\"";
692 let yaml_latest = "\"latest\"";
693
694 assert_eq!(
695 serde_json::from_str::<SeekMode>(yaml_beginning).unwrap(),
696 SeekMode::Beginning
697 );
698 assert_eq!(
699 serde_json::from_str::<SeekMode>(yaml_last).unwrap(),
700 SeekMode::LastAcknowledged
701 );
702 assert_eq!(
703 serde_json::from_str::<SeekMode>(yaml_latest).unwrap(),
704 SeekMode::Latest
705 );
706 }
707
708 #[test]
709 fn test_deliver_config_single_sink() {
710 let config = DeliverConfig {
711 sink: Some("push".to_string()),
712 sinks: None,
713 };
714 assert_eq!(config.sink_names(), vec!["push"]);
715 }
716
717 #[test]
718 fn test_deliver_config_multiple_sinks() {
719 let config = DeliverConfig {
720 sink: None,
721 sinks: Some(vec!["push".to_string(), "in-app".to_string()]),
722 };
723 assert_eq!(config.sink_names(), vec!["push", "in-app"]);
724 }
725
726 #[test]
727 fn test_deliver_config_empty() {
728 let config = DeliverConfig {
729 sink: None,
730 sinks: None,
731 };
732 assert!(config.sink_names().is_empty());
733 }
734
735 #[test]
736 fn test_deliver_config_both_sink_and_sinks_merged() {
737 let config = DeliverConfig {
738 sink: Some("push".to_string()),
739 sinks: Some(vec!["in-app".to_string(), "websocket".to_string()]),
740 };
741 let names = config.sink_names();
742 assert_eq!(names.len(), 3);
743 assert!(names.contains(&"push"));
744 assert!(names.contains(&"in-app"));
745 assert!(names.contains(&"websocket"));
746 }
747
748 #[test]
749 fn test_deliver_config_both_with_duplicate_deduped() {
750 let config = DeliverConfig {
751 sink: Some("push".to_string()),
752 sinks: Some(vec!["push".to_string(), "in-app".to_string()]),
753 };
754 let names = config.sink_names();
755 assert_eq!(names.len(), 2);
757 assert_eq!(names, vec!["push", "in-app"]);
758 }
759
760 #[test]
761 fn test_resolve_direction_defaults() {
762 let yaml = r#"
763from: target_id
764as: owner
765"#;
766 let config: ResolveConfig = serde_yaml::from_str(yaml).unwrap();
767 assert_eq!(config.direction, "forward");
768 assert!(config.via.is_none());
769 }
770
771 #[test]
772 fn test_rate_limit_strategy_default() {
773 let yaml = r#"
774max: 50
775per: 1s
776"#;
777 let config: RateLimitConfig = serde_yaml::from_str(yaml).unwrap();
778 assert_eq!(config.strategy, "drop");
779 }
780
781 #[test]
782 fn test_batch_min_count_default() {
783 let yaml = r#"
784key: target_id
785window: 5m
786"#;
787 let config: BatchConfig = serde_yaml::from_str(yaml).unwrap();
788 assert_eq!(config.min_count, 1);
789 }
790
791 #[test]
792 fn test_flow_with_fan_out_pipeline() {
793 let yaml = r#"
794name: feed-update
795trigger:
796 kind: link.created
797 link_type: owns
798pipeline:
799 - resolve:
800 from: source_id
801 as: creator
802 - fan_out:
803 from: source_id
804 via: follows
805 direction: reverse
806 as: follower
807 - map:
808 template:
809 type: feed_update
810 recipient_id: "{{ follower.id }}"
811 message: "{{ creator.name }} posted a new trace"
812 - deliver:
813 sinks: [in-app-notification, websocket]
814"#;
815
816 let flow: FlowConfig = serde_yaml::from_str(yaml).unwrap();
817 assert_eq!(flow.name, "feed-update");
818 assert_eq!(flow.pipeline.len(), 4);
819 assert!(
820 matches!(&flow.pipeline[1], PipelineStep::FanOut(f) if f.via == "follows" && f.direction == "reverse")
821 );
822 }
823
824 #[test]
825 fn test_trigger_entity_event() {
826 let yaml = r#"
827kind: entity.created
828entity_type: user
829"#;
830 let trigger: TriggerConfig = serde_yaml::from_str(yaml).unwrap();
831 assert_eq!(trigger.kind, "entity.created");
832 assert_eq!(trigger.entity_type.as_deref(), Some("user"));
833 assert!(trigger.link_type.is_none());
834 }
835
836 #[test]
837 fn test_trigger_wildcard() {
838 let yaml = r#"
839kind: link.created
840"#;
841 let trigger: TriggerConfig = serde_yaml::from_str(yaml).unwrap();
842 assert_eq!(trigger.kind, "link.created");
843 assert!(trigger.link_type.is_none());
844 assert!(trigger.entity_type.is_none());
845 }
846}