1use std::collections::HashMap;
19
20use anyhow::{Context, Result, bail};
21use serde_json::Value;
22
23use crate::Courier;
24use crate::config::{Config, PipelineSpec, SinkSpec, SourceSpec, TransformSpec, redact_secret};
25use crate::observability::init_metrics;
26use crate::pipeline::{ErrorPolicy, Pipeline};
27use crate::retry::RetryPolicy;
28use crate::sinks::Sink;
29use crate::sources::Source;
30use crate::transforms::Transform;
31
32pub trait SourceFactory: Send + Sync {
37 fn build(&self, id: &str, config: Value, retry: Option<RetryPolicy>)
38 -> Result<Box<dyn Source>>;
39}
40
41impl<F> SourceFactory for F
42where
43 F: Fn(&str, Value, Option<RetryPolicy>) -> Result<Box<dyn Source>> + Send + Sync,
44{
45 fn build(
46 &self,
47 id: &str,
48 config: Value,
49 retry: Option<RetryPolicy>,
50 ) -> Result<Box<dyn Source>> {
51 (self)(id, config, retry)
52 }
53}
54
55pub trait TransformFactory: Send + Sync {
58 fn build(&self, id: &str, config: Value, on_error: ErrorPolicy) -> Result<Box<dyn Transform>>;
59}
60
61impl<F> TransformFactory for F
62where
63 F: Fn(&str, Value, ErrorPolicy) -> Result<Box<dyn Transform>> + Send + Sync,
64{
65 fn build(&self, id: &str, config: Value, on_error: ErrorPolicy) -> Result<Box<dyn Transform>> {
66 (self)(id, config, on_error)
67 }
68}
69
70pub trait SinkFactory: Send + Sync {
75 fn build(
76 &self,
77 id: &str,
78 config: Value,
79 on_error: ErrorPolicy,
80 retry: Option<RetryPolicy>,
81 ) -> Result<Box<dyn Sink>>;
82}
83
84impl<F> SinkFactory for F
85where
86 F: Fn(&str, Value, ErrorPolicy, Option<RetryPolicy>) -> Result<Box<dyn Sink>> + Send + Sync,
87{
88 fn build(
89 &self,
90 id: &str,
91 config: Value,
92 on_error: ErrorPolicy,
93 retry: Option<RetryPolicy>,
94 ) -> Result<Box<dyn Sink>> {
95 (self)(id, config, on_error, retry)
96 }
97}
98
99#[derive(Default)]
104pub struct Registry {
105 source_factories: HashMap<String, Box<dyn SourceFactory>>,
106 transform_factories: HashMap<String, Box<dyn TransformFactory>>,
107 sink_factories: HashMap<String, Box<dyn SinkFactory>>,
108}
109
110impl Registry {
111 pub fn with_builtins() -> Result<Self> {
115 let mut registry = Self::default();
116 register_builtin(&mut registry)?;
117 Ok(registry)
118 }
119
120 pub fn register_source(
121 &mut self,
122 kind: impl Into<String>,
123 factory: impl SourceFactory + 'static,
124 ) -> Result<()> {
125 let kind = kind.into();
126 if self.source_factories.contains_key(&kind) {
127 bail!("source factory '{kind}' already registered");
128 }
129 self.source_factories.insert(kind, Box::new(factory));
130 Ok(())
131 }
132
133 pub fn register_transform(
134 &mut self,
135 kind: impl Into<String>,
136 factory: impl TransformFactory + 'static,
137 ) -> Result<()> {
138 let kind = kind.into();
139 if self.transform_factories.contains_key(&kind) {
140 bail!("transform factory '{kind}' already registered");
141 }
142 self.transform_factories.insert(kind, Box::new(factory));
143 Ok(())
144 }
145
146 pub fn register_sink(
147 &mut self,
148 kind: impl Into<String>,
149 factory: impl SinkFactory + 'static,
150 ) -> Result<()> {
151 let kind = kind.into();
152 if self.sink_factories.contains_key(&kind) {
153 bail!("sink factory '{kind}' already registered");
154 }
155 self.sink_factories.insert(kind, Box::new(factory));
156 Ok(())
157 }
158
159 pub fn build_source(&self, id: &str, spec: SourceSpec) -> Result<Box<dyn Source>> {
160 let kind = spec.kind;
161 let retry = spec.retry;
162 let factory = self
163 .source_factories
164 .get(&kind)
165 .with_context(|| format!("unknown source type '{}'", redact_secret(&kind)))?;
166 factory
167 .build(id, spec.config, retry)
168 .with_context(|| format!("failed to build source '{}'", redact_secret(&kind)))
169 }
170
171 pub fn build_transform(&self, id: &str, spec: TransformSpec) -> Result<Box<dyn Transform>> {
172 let kind = spec.kind;
173 let on_error = spec.on_error.unwrap_or_default().into();
174 let factory = self
175 .transform_factories
176 .get(&kind)
177 .with_context(|| format!("unknown transform type '{}'", redact_secret(&kind)))?;
178 factory
179 .build(id, spec.config, on_error)
180 .with_context(|| format!("failed to build transform '{}'", redact_secret(&kind)))
181 }
182
183 pub fn build_sink(&self, id: &str, spec: SinkSpec) -> Result<Box<dyn Sink>> {
184 let kind = spec.kind;
185 let on_error = spec.on_error.unwrap_or_default().into();
186 let retry = spec.retry;
187 let factory = self
188 .sink_factories
189 .get(&kind)
190 .with_context(|| format!("unknown sink type '{}'", redact_secret(&kind)))?;
191 factory
192 .build(id, spec.config, on_error, retry)
193 .with_context(|| format!("failed to build sink '{}'", redact_secret(&kind)))
194 }
195
196 pub fn build_courier(&self, config: Config) -> Result<Courier> {
200 config.validate()?;
201
202 let observability = config.observability.clone();
203 let metrics = init_metrics(observability.as_ref())?;
204 let mut pipelines = Vec::with_capacity(config.pipelines.len());
205 for spec in config.pipelines {
206 let name = spec.name.clone();
207 let mut pipeline = self
208 .build_pipeline(spec)
209 .with_context(|| format!("failed to build pipeline '{}'", redact_secret(&name)))?;
210 pipeline = pipeline.with_observability(Some(metrics.clone()));
211 pipelines.push(pipeline);
212 }
213 Ok(Courier::new(pipelines)
214 .with_observability(observability)
215 .with_metrics(metrics))
216 }
217
218 pub fn dry_run_build(&self, config: Config) -> Result<()> {
223 config.validate()?;
224 for spec in config.pipelines {
225 let name = spec.name.clone();
226 self.build_pipeline(spec)
227 .with_context(|| format!("failed to build pipeline '{}'", redact_secret(&name)))?;
228 }
229 Ok(())
230 }
231
232 fn build_pipeline(&self, spec: PipelineSpec) -> Result<Pipeline> {
233 let name = spec.name;
234 let source = self
235 .build_source(&format!("{name}/src"), spec.source)
236 .with_context(|| format!("pipeline '{}' source", redact_secret(&name)))?;
237
238 let mut pipeline = Pipeline::new(&name, source);
239 if let Some(capacity) = spec.channel_capacity {
240 pipeline = pipeline.with_channel_capacity(capacity);
241 }
242
243 for (i, transform) in spec.transforms.into_iter().enumerate() {
244 let id = format!("{name}/t{i}");
245 pipeline =
246 pipeline.with_transform(self.build_transform(&id, transform).with_context(
247 || format!("pipeline '{}' transform[{i}]", redact_secret(&name)),
248 )?);
249 }
250
251 for (i, sink) in spec.sinks.into_iter().enumerate() {
252 let id = format!("{name}/sink{i}");
253 pipeline = pipeline.with_sink(
254 self.build_sink(&id, sink)
255 .with_context(|| format!("pipeline '{}' sink[{i}]", redact_secret(&name)))?,
256 );
257 }
258
259 Ok(pipeline)
260 }
261
262 pub fn source_kinds(&self) -> impl Iterator<Item = &str> {
265 self.source_factories.keys().map(String::as_str)
266 }
267
268 pub fn transform_kinds(&self) -> impl Iterator<Item = &str> {
269 self.transform_factories.keys().map(String::as_str)
270 }
271
272 pub fn sink_kinds(&self) -> impl Iterator<Item = &str> {
273 self.sink_factories.keys().map(String::as_str)
274 }
275}
276
277pub fn register_builtin(registry: &mut Registry) -> Result<()> {
285 registry.register_source("api_poll", crate::sources::api::api_poll_source_factory)?;
286 registry.register_source(
287 "http_webhook",
288 crate::sources::http_webhook::http_webhook_source_factory,
289 )?;
290 registry.register_source("kafka", crate::sources::kafka::kafka_source_factory)?;
291 registry.register_source(
292 "sql_query_poll",
293 crate::sources::sql::sql_query_poll_source_factory,
294 )?;
295 registry.register_transform("batch", crate::transforms::batch::batch_transform_factory)?;
296 registry.register_transform(
297 "filter",
298 crate::transforms::filter::filter_transform_factory,
299 )?;
300 registry.register_transform(
301 "mutate",
302 crate::transforms::mutate::mutate_transform_factory,
303 )?;
304 registry.register_transform(
305 "script",
306 crate::transforms::script::script_transform_factory,
307 )?;
308 registry.register_transform(
309 "set_key",
310 crate::transforms::set_key::set_key_transform_factory,
311 )?;
312 registry.register_sink("api", crate::sinks::api::api_sink_factory)?;
313 registry.register_sink("file", crate::sinks::file::file_sink_factory)?;
314 registry.register_sink("kafka", crate::sinks::kafka::kafka_sink_factory)?;
315 registry.register_sink("sql", crate::sinks::sql::sql_sink_factory)?;
316 Ok(())
317}
318
319#[cfg(test)]
320mod tests {
321 use std::sync::{Arc, Mutex};
322
323 use anyhow::anyhow;
324 use async_trait::async_trait;
325 use serde_json::{Value, json};
326 use tokio::sync::mpsc::{Receiver, Sender};
327 use tokio_util::sync::CancellationToken;
328
329 use super::*;
330 use crate::config::{ErrorPolicyConfig, PipelineSpec, SinkSpec, SourceSpec, TransformSpec};
331 use crate::envelope::Envelope;
332 use crate::retry::RetryPolicy;
333
334 struct NoopSource(String);
335
336 #[async_trait]
337 impl Source for NoopSource {
338 fn id(&self) -> &str {
339 &self.0
340 }
341
342 async fn run(self: Box<Self>, _tx: Sender<Envelope>, _cancel: CancellationToken) {}
343 }
344
345 struct NoopTransform(String);
346
347 #[async_trait]
348 impl Transform for NoopTransform {
349 fn id(&self) -> &str {
350 &self.0
351 }
352
353 async fn run(
354 self: Box<Self>,
355 _rx: Receiver<Envelope>,
356 _tx: Sender<Envelope>,
357 _cancel: CancellationToken,
358 ) {
359 }
360 }
361
362 struct NoopSink(String);
363
364 #[async_trait]
365 impl Sink for NoopSink {
366 fn id(&self) -> &str {
367 &self.0
368 }
369
370 async fn run(self: Box<Self>, _rx: Receiver<Envelope>, _cancel: CancellationToken) {}
371 }
372
373 fn noop_source(id: &str, _: Value, _: Option<RetryPolicy>) -> Result<Box<dyn Source>> {
374 Ok(Box::new(NoopSource(id.to_string())))
375 }
376
377 fn noop_transform(id: &str, _: Value, _: ErrorPolicy) -> Result<Box<dyn Transform>> {
378 Ok(Box::new(NoopTransform(id.to_string())))
379 }
380
381 fn noop_sink(
382 id: &str,
383 _: Value,
384 _: ErrorPolicy,
385 _: Option<RetryPolicy>,
386 ) -> Result<Box<dyn Sink>> {
387 Ok(Box::new(NoopSink(id.to_string())))
388 }
389
390 fn noop_registry() -> Registry {
393 let mut r = Registry::default();
394 r.register_source("noop", noop_source).unwrap();
395 r.register_transform("noop", noop_transform).unwrap();
396 r.register_sink("noop", noop_sink).unwrap();
397 r
398 }
399
400 fn noop_source_spec() -> SourceSpec {
401 SourceSpec {
402 kind: "noop".into(),
403 config: json!({}),
404 retry: None,
405 }
406 }
407
408 fn noop_transform_spec(on_error: Option<ErrorPolicyConfig>) -> TransformSpec {
409 TransformSpec {
410 kind: "noop".into(),
411 config: json!({}),
412 on_error,
413 }
414 }
415
416 fn noop_sink_spec(on_error: Option<ErrorPolicyConfig>) -> SinkSpec {
417 SinkSpec {
418 kind: "noop".into(),
419 config: json!({}),
420 on_error,
421 retry: None,
422 }
423 }
424
425 #[test]
430 fn rejects_duplicate_source_registration() {
431 let mut registry = Registry::default();
432 registry.register_source("dup", noop_source).unwrap();
433 let err = registry.register_source("dup", noop_source).unwrap_err();
434 assert!(
435 err.to_string()
436 .contains("source factory 'dup' already registered")
437 );
438 }
439
440 #[test]
441 fn rejects_duplicate_transform_registration() {
442 let mut registry = Registry::default();
443 registry.register_transform("dup", noop_transform).unwrap();
444 let err = registry
445 .register_transform("dup", noop_transform)
446 .unwrap_err();
447 assert!(
448 err.to_string()
449 .contains("transform factory 'dup' already registered")
450 );
451 }
452
453 #[test]
454 fn rejects_duplicate_sink_registration() {
455 let mut registry = Registry::default();
456 registry.register_sink("dup", noop_sink).unwrap();
457 let err = registry.register_sink("dup", noop_sink).unwrap_err();
458 assert!(
459 err.to_string()
460 .contains("sink factory 'dup' already registered")
461 );
462 }
463
464 #[test]
465 fn same_kind_across_categories_does_not_collide() {
466 let mut registry = Registry::default();
469 registry.register_source("kafka", noop_source).unwrap();
470 registry.register_sink("kafka", noop_sink).unwrap();
471 }
472
473 #[test]
478 fn reports_unknown_source_type() {
479 let registry = Registry::default();
480 let err = registry
481 .build_source("p/src", noop_source_spec_with_kind("missing"))
482 .err()
483 .expect("expected unknown-kind error");
484 assert!(err.to_string().contains("unknown source type 'missing'"));
485 }
486
487 #[test]
488 fn reports_unknown_transform_type() {
489 let registry = Registry::default();
490 let err = registry
491 .build_transform(
492 "p/t0",
493 TransformSpec {
494 kind: "missing".into(),
495 config: json!({}),
496 on_error: None,
497 },
498 )
499 .err()
500 .expect("expected unknown-kind error");
501 assert!(err.to_string().contains("unknown transform type 'missing'"));
502 }
503
504 #[test]
505 fn reports_unknown_sink_type() {
506 let registry = Registry::default();
507 let err = registry
508 .build_sink(
509 "p/sink0",
510 SinkSpec {
511 kind: "missing".into(),
512 config: json!({}),
513 on_error: None,
514 retry: None,
515 },
516 )
517 .err()
518 .expect("expected unknown-kind error");
519 assert!(err.to_string().contains("unknown sink type 'missing'"));
520 }
521
522 fn noop_source_spec_with_kind(kind: &str) -> SourceSpec {
523 SourceSpec {
524 kind: kind.into(),
525 config: json!({}),
526 retry: None,
527 }
528 }
529
530 #[test]
535 fn with_builtins_registers_every_builtin_kind() {
536 let registry = Registry::with_builtins().unwrap();
537
538 let mut sources: Vec<_> = registry.source_kinds().collect();
539 sources.sort();
540 assert_eq!(
541 sources,
542 vec!["api_poll", "http_webhook", "kafka", "sql_query_poll"]
543 );
544
545 let mut transforms: Vec<_> = registry.transform_kinds().collect();
546 transforms.sort();
547 assert_eq!(
548 transforms,
549 vec!["batch", "filter", "mutate", "script", "set_key"]
550 );
551
552 let mut sinks: Vec<_> = registry.sink_kinds().collect();
553 sinks.sort();
554 assert_eq!(sinks, vec!["api", "file", "kafka", "sql"]);
555 }
556
557 #[test]
558 fn register_builtin_fails_on_second_call() {
559 let mut registry = Registry::default();
560 register_builtin(&mut registry).unwrap();
561 let err = register_builtin(&mut registry).unwrap_err();
562 assert!(err.to_string().contains("already registered"));
563 }
564
565 #[test]
574 fn build_courier_with_empty_config_yields_zero_pipelines() {
575 let registry = noop_registry();
576 let courier = registry.build_courier(Config::default()).unwrap();
577 let handles = courier.spawn(CancellationToken::new());
580 assert!(handles.is_empty());
581 }
582
583 #[test]
584 fn build_courier_assigns_hierarchical_node_ids() {
585 let recorded = Arc::new(Mutex::new(Vec::<String>::new()));
587
588 let mut registry = Registry::default();
589 let rec = recorded.clone();
590 registry
591 .register_source("rec", move |id: &str, _: Value, _: Option<RetryPolicy>| {
592 rec.lock().unwrap().push(id.to_string());
593 Ok(Box::new(NoopSource(id.into())) as Box<dyn Source>)
594 })
595 .unwrap();
596 let rec = recorded.clone();
597 registry
598 .register_transform("rec", move |id: &str, _: Value, _: ErrorPolicy| {
599 rec.lock().unwrap().push(id.to_string());
600 Ok(Box::new(NoopTransform(id.into())) as Box<dyn Transform>)
601 })
602 .unwrap();
603 let rec = recorded.clone();
604 registry
605 .register_sink(
606 "rec",
607 move |id: &str, _: Value, _: ErrorPolicy, _: Option<RetryPolicy>| {
608 rec.lock().unwrap().push(id.to_string());
609 Ok(Box::new(NoopSink(id.into())) as Box<dyn Sink>)
610 },
611 )
612 .unwrap();
613
614 registry
615 .build_courier(Config {
616 observability: None,
617 pipelines: vec![PipelineSpec {
618 name: "my-pipeline".into(),
619 source: SourceSpec {
620 kind: "rec".into(),
621 config: json!({}),
622 retry: None,
623 },
624 transforms: vec![
625 TransformSpec {
626 kind: "rec".into(),
627 config: json!({}),
628 on_error: None,
629 },
630 TransformSpec {
631 kind: "rec".into(),
632 config: json!({}),
633 on_error: None,
634 },
635 ],
636 sinks: vec![
637 SinkSpec {
638 kind: "rec".into(),
639 config: json!({}),
640 on_error: None,
641 retry: None,
642 },
643 SinkSpec {
644 kind: "rec".into(),
645 config: json!({}),
646 on_error: None,
647 retry: None,
648 },
649 ],
650 channel_capacity: None,
651 }],
652 })
653 .unwrap();
654
655 let seen = recorded.lock().unwrap().clone();
656 assert_eq!(
657 seen,
658 vec![
659 "my-pipeline/src",
660 "my-pipeline/t0",
661 "my-pipeline/t1",
662 "my-pipeline/sink0",
663 "my-pipeline/sink1",
664 ],
665 );
666 }
667
668 #[test]
669 fn build_courier_wraps_component_errors_with_pipeline_name() {
670 let mut registry = Registry::default();
671 registry
672 .register_source("boom", |_: &str, _: Value, _: Option<RetryPolicy>| {
673 Err(anyhow!("source blew up"))
674 })
675 .unwrap();
676 registry.register_sink("noop", noop_sink).unwrap();
677
678 let err = registry
679 .build_courier(Config {
680 observability: None,
681 pipelines: vec![PipelineSpec {
682 name: "analytics".into(),
683 source: SourceSpec {
684 kind: "boom".into(),
685 config: json!({}),
686 retry: None,
687 },
688 transforms: vec![],
689 sinks: vec![noop_sink_spec(None)],
690 channel_capacity: None,
691 }],
692 })
693 .err()
694 .expect("expected factory error to propagate");
695
696 let msg = format!("{err:#}");
697 assert!(msg.contains("pipeline 'analytics'"), "{msg}");
698 assert!(msg.contains("source 'boom'"), "{msg}");
699 assert!(msg.contains("source blew up"), "{msg}");
700 }
701
702 #[test]
703 fn propagates_on_error_to_factories() {
704 let mut registry = Registry::default();
705 registry.register_source("noop", noop_source).unwrap();
706
707 let seen_tx = Arc::new(Mutex::new(Vec::new()));
708 let seen = seen_tx.clone();
709 registry
710 .register_transform(
711 "tracking",
712 move |_: &str, _: Value, on_error: ErrorPolicy| {
713 seen_tx.lock().unwrap().push(on_error);
714 Ok(Box::new(NoopTransform("t".into())) as Box<dyn Transform>)
715 },
716 )
717 .unwrap();
718
719 let seen_sx = Arc::new(Mutex::new(Vec::new()));
720 let seen2 = seen_sx.clone();
721 registry
722 .register_sink(
723 "tracking",
724 move |_: &str, _: Value, on_error: ErrorPolicy, _: Option<RetryPolicy>| {
725 seen_sx.lock().unwrap().push(on_error);
726 Ok(Box::new(NoopSink("s".into())) as Box<dyn Sink>)
727 },
728 )
729 .unwrap();
730
731 registry
732 .build_courier(Config {
733 observability: None,
734 pipelines: vec![PipelineSpec {
735 name: "p".into(),
736 source: noop_source_spec(),
737 transforms: vec![
738 TransformSpec {
739 kind: "tracking".into(),
740 config: json!({}),
741 on_error: Some(ErrorPolicyConfig::FailPipeline),
742 },
743 TransformSpec {
744 kind: "tracking".into(),
745 config: json!({}),
746 on_error: None, },
748 ],
749 sinks: vec![SinkSpec {
750 kind: "tracking".into(),
751 config: json!({}),
752 on_error: None,
753 retry: None,
754 }],
755 channel_capacity: Some(32),
756 }],
757 })
758 .unwrap();
759
760 assert_eq!(
761 *seen.lock().unwrap(),
762 vec![ErrorPolicy::FailPipeline, ErrorPolicy::Drop],
763 );
764 assert_eq!(*seen2.lock().unwrap(), vec![ErrorPolicy::Drop]);
765 }
766
767 #[test]
768 fn ignored_channel_capacity_falls_back_to_pipeline_default() {
769 let registry = noop_registry();
773 registry
774 .build_courier(Config {
775 observability: None,
776 pipelines: vec![PipelineSpec {
777 name: "p".into(),
778 source: noop_source_spec(),
779 transforms: vec![noop_transform_spec(None)],
780 sinks: vec![noop_sink_spec(None)],
781 channel_capacity: None,
782 }],
783 })
784 .unwrap();
785 }
786}