Skip to main content

courier/
registry.rs

1//! Runtime component registry.
2//!
3//! Maps short type names (`"kafka"`, `"api_poll"`, …) to factories that
4//! build concrete `Source`, `Transform`, and `Sink` instances from a
5//! `serde_json::Value` spec. Built-ins register via [`register_builtin`];
6//! external crates register more of their own before the `Courier` is
7//! constructed.
8//!
9//! Plugin model (by level):
10//! 1. Built-ins — components shipped in this crate, registered via
11//!    [`register_builtin`].
12//! 2. Statically-linked plugin crates — `my_plugin::register(&mut registry)`
13//!    at startup. First-class native plugin mechanism.
14//! 3. (Future) Dynamically-loaded plugins — crate-boundary via `libloading`
15//!    or an embedded scripting runtime. The factory traits are already
16//!    object-safe, so this is additive.
17
18use std::collections::HashMap;
19
20use anyhow::{Context, Result, bail};
21use serde_json::Value;
22
23use crate::Courier;
24use crate::config::{Config, PipelineSpec, SinkSpec, SourceSpec, TransformSpec, redact_secret};
25use crate::observability::init_metrics;
26use crate::pipeline::{ErrorPolicy, Pipeline};
27use crate::retry::RetryPolicy;
28use crate::sinks::Sink;
29use crate::sources::Source;
30use crate::transforms::Transform;
31
32/// Builds a `Box<dyn Source>` from a JSON spec. Polling sources receive an
33/// optional `RetryPolicy` extracted from `SourceSpec`; push-based sources
34/// (kafka, http_webhook) reject it at factory time so users don't think
35/// retry is doing something it isn't.
36pub trait SourceFactory: Send + Sync {
37    fn build(&self, id: &str, config: Value, retry: Option<RetryPolicy>)
38    -> Result<Box<dyn Source>>;
39}
40
41impl<F> SourceFactory for F
42where
43    F: Fn(&str, Value, Option<RetryPolicy>) -> Result<Box<dyn Source>> + Send + Sync,
44{
45    fn build(
46        &self,
47        id: &str,
48        config: Value,
49        retry: Option<RetryPolicy>,
50    ) -> Result<Box<dyn Source>> {
51        (self)(id, config, retry)
52    }
53}
54
55/// Builds a `Box<dyn Transform>` from a JSON spec. Factories that wrap a
56/// `MapOne` in `BasicTransform` are responsible for applying `on_error`.
57pub trait TransformFactory: Send + Sync {
58    fn build(&self, id: &str, config: Value, on_error: ErrorPolicy) -> Result<Box<dyn Transform>>;
59}
60
61impl<F> TransformFactory for F
62where
63    F: Fn(&str, Value, ErrorPolicy) -> Result<Box<dyn Transform>> + Send + Sync,
64{
65    fn build(&self, id: &str, config: Value, on_error: ErrorPolicy) -> Result<Box<dyn Transform>> {
66        (self)(id, config, on_error)
67    }
68}
69
70/// Builds a `Box<dyn Sink>` from a JSON spec. Factories that wrap a
71/// `WriteOne` in `ManagedSink` are responsible for applying `on_error` and
72/// `retry` — both are extracted from `SinkSpec` by the registry and passed
73/// directly, so no per-sink config parsing is needed for these policies.
74pub trait SinkFactory: Send + Sync {
75    fn build(
76        &self,
77        id: &str,
78        config: Value,
79        on_error: ErrorPolicy,
80        retry: Option<RetryPolicy>,
81    ) -> Result<Box<dyn Sink>>;
82}
83
84impl<F> SinkFactory for F
85where
86    F: Fn(&str, Value, ErrorPolicy, Option<RetryPolicy>) -> Result<Box<dyn Sink>> + Send + Sync,
87{
88    fn build(
89        &self,
90        id: &str,
91        config: Value,
92        on_error: ErrorPolicy,
93        retry: Option<RetryPolicy>,
94    ) -> Result<Box<dyn Sink>> {
95        (self)(id, config, on_error, retry)
96    }
97}
98
99/// Runtime registry of pipeline components. Each category (source,
100/// transform, sink) is a separate namespace — `"kafka"` as a source and
101/// `"kafka"` as a sink coexist without collision. Duplicate `kind`s
102/// within a category are rejected at registration time.
103#[derive(Default)]
104pub struct Registry {
105    source_factories: HashMap<String, Box<dyn SourceFactory>>,
106    transform_factories: HashMap<String, Box<dyn TransformFactory>>,
107    sink_factories: HashMap<String, Box<dyn SinkFactory>>,
108}
109
110impl Registry {
111    /// Convenience constructor: empty registry preloaded with every
112    /// built-in component from this crate. Equivalent to
113    /// `let mut r = Registry::default(); register_builtin(&mut r)?;`.
114    pub fn with_builtins() -> Result<Self> {
115        let mut registry = Self::default();
116        register_builtin(&mut registry)?;
117        Ok(registry)
118    }
119
120    pub fn register_source(
121        &mut self,
122        kind: impl Into<String>,
123        factory: impl SourceFactory + 'static,
124    ) -> Result<()> {
125        let kind = kind.into();
126        if self.source_factories.contains_key(&kind) {
127            bail!("source factory '{kind}' already registered");
128        }
129        self.source_factories.insert(kind, Box::new(factory));
130        Ok(())
131    }
132
133    pub fn register_transform(
134        &mut self,
135        kind: impl Into<String>,
136        factory: impl TransformFactory + 'static,
137    ) -> Result<()> {
138        let kind = kind.into();
139        if self.transform_factories.contains_key(&kind) {
140            bail!("transform factory '{kind}' already registered");
141        }
142        self.transform_factories.insert(kind, Box::new(factory));
143        Ok(())
144    }
145
146    pub fn register_sink(
147        &mut self,
148        kind: impl Into<String>,
149        factory: impl SinkFactory + 'static,
150    ) -> Result<()> {
151        let kind = kind.into();
152        if self.sink_factories.contains_key(&kind) {
153            bail!("sink factory '{kind}' already registered");
154        }
155        self.sink_factories.insert(kind, Box::new(factory));
156        Ok(())
157    }
158
159    pub fn build_source(&self, id: &str, spec: SourceSpec) -> Result<Box<dyn Source>> {
160        let kind = spec.kind;
161        let retry = spec.retry;
162        let factory = self
163            .source_factories
164            .get(&kind)
165            .with_context(|| format!("unknown source type '{}'", redact_secret(&kind)))?;
166        factory
167            .build(id, spec.config, retry)
168            .with_context(|| format!("failed to build source '{}'", redact_secret(&kind)))
169    }
170
171    pub fn build_transform(&self, id: &str, spec: TransformSpec) -> Result<Box<dyn Transform>> {
172        let kind = spec.kind;
173        let on_error = spec.on_error.unwrap_or_default().into();
174        let factory = self
175            .transform_factories
176            .get(&kind)
177            .with_context(|| format!("unknown transform type '{}'", redact_secret(&kind)))?;
178        factory
179            .build(id, spec.config, on_error)
180            .with_context(|| format!("failed to build transform '{}'", redact_secret(&kind)))
181    }
182
183    pub fn build_sink(&self, id: &str, spec: SinkSpec) -> Result<Box<dyn Sink>> {
184        let kind = spec.kind;
185        let on_error = spec.on_error.unwrap_or_default().into();
186        let retry = spec.retry;
187        let factory = self
188            .sink_factories
189            .get(&kind)
190            .with_context(|| format!("unknown sink type '{}'", redact_secret(&kind)))?;
191        factory
192            .build(id, spec.config, on_error, retry)
193            .with_context(|| format!("failed to build sink '{}'", redact_secret(&kind)))
194    }
195
196    /// Builds a whole `Courier` from a `Config`. Mints hierarchical node
197    /// ids (`{pipeline}/src`, `{pipeline}/t{i}`, `{pipeline}/sink{i}`) so
198    /// logs and metrics can be traced back to the pipeline that owns them.
199    pub fn build_courier(&self, config: Config) -> Result<Courier> {
200        config.validate()?;
201
202        let observability = config.observability.clone();
203        let metrics = init_metrics(observability.as_ref())?;
204        let mut pipelines = Vec::with_capacity(config.pipelines.len());
205        for spec in config.pipelines {
206            let name = spec.name.clone();
207            let mut pipeline = self
208                .build_pipeline(spec)
209                .with_context(|| format!("failed to build pipeline '{}'", redact_secret(&name)))?;
210            pipeline = pipeline.with_observability(Some(metrics.clone()));
211            pipelines.push(pipeline);
212        }
213        Ok(Courier::new(pipelines)
214            .with_observability(observability)
215            .with_metrics(metrics))
216    }
217
218    /// Validate `config` and exercise every component factory without
219    /// producing a runtime. `courier validate` uses this so OTLP
220    /// exporters and metric providers are never constructed during a
221    /// pure config check.
222    pub fn dry_run_build(&self, config: Config) -> Result<()> {
223        config.validate()?;
224        for spec in config.pipelines {
225            let name = spec.name.clone();
226            self.build_pipeline(spec)
227                .with_context(|| format!("failed to build pipeline '{}'", redact_secret(&name)))?;
228        }
229        Ok(())
230    }
231
232    fn build_pipeline(&self, spec: PipelineSpec) -> Result<Pipeline> {
233        let name = spec.name;
234        let source = self
235            .build_source(&format!("{name}/src"), spec.source)
236            .with_context(|| format!("pipeline '{}' source", redact_secret(&name)))?;
237
238        let mut pipeline = Pipeline::new(&name, source);
239        if let Some(capacity) = spec.channel_capacity {
240            pipeline = pipeline.with_channel_capacity(capacity);
241        }
242
243        for (i, transform) in spec.transforms.into_iter().enumerate() {
244            let id = format!("{name}/t{i}");
245            pipeline =
246                pipeline.with_transform(self.build_transform(&id, transform).with_context(
247                    || format!("pipeline '{}' transform[{i}]", redact_secret(&name)),
248                )?);
249        }
250
251        for (i, sink) in spec.sinks.into_iter().enumerate() {
252            let id = format!("{name}/sink{i}");
253            pipeline = pipeline.with_sink(
254                self.build_sink(&id, sink)
255                    .with_context(|| format!("pipeline '{}' sink[{i}]", redact_secret(&name)))?,
256            );
257        }
258
259        Ok(pipeline)
260    }
261
262    /// Kinds currently registered for each category. Useful for
263    /// diagnostics or for a `--list-components` CLI flag.
264    pub fn source_kinds(&self) -> impl Iterator<Item = &str> {
265        self.source_factories.keys().map(String::as_str)
266    }
267
268    pub fn transform_kinds(&self) -> impl Iterator<Item = &str> {
269        self.transform_factories.keys().map(String::as_str)
270    }
271
272    pub fn sink_kinds(&self) -> impl Iterator<Item = &str> {
273        self.sink_factories.keys().map(String::as_str)
274    }
275}
276
277/// Registers every built-in source, transform and sink onto `registry`.
278/// Each factory lives next to its component in the module tree, so this
279/// function is purely orchestration — the concrete construction logic
280/// and `*Config` structs are defined alongside their components.
281///
282/// Errors if any of the built-in `kind`s is already registered — callers
283/// that want to override a built-in should do so *after* this call.
284pub fn register_builtin(registry: &mut Registry) -> Result<()> {
285    registry.register_source("api_poll", crate::sources::api::api_poll_source_factory)?;
286    registry.register_source(
287        "http_webhook",
288        crate::sources::http_webhook::http_webhook_source_factory,
289    )?;
290    registry.register_source("kafka", crate::sources::kafka::kafka_source_factory)?;
291    registry.register_source(
292        "sql_query_poll",
293        crate::sources::sql::sql_query_poll_source_factory,
294    )?;
295    registry.register_transform("batch", crate::transforms::batch::batch_transform_factory)?;
296    registry.register_transform(
297        "filter",
298        crate::transforms::filter::filter_transform_factory,
299    )?;
300    registry.register_transform(
301        "mutate",
302        crate::transforms::mutate::mutate_transform_factory,
303    )?;
304    registry.register_transform(
305        "script",
306        crate::transforms::script::script_transform_factory,
307    )?;
308    registry.register_transform(
309        "set_key",
310        crate::transforms::set_key::set_key_transform_factory,
311    )?;
312    registry.register_sink("api", crate::sinks::api::api_sink_factory)?;
313    registry.register_sink("file", crate::sinks::file::file_sink_factory)?;
314    registry.register_sink("kafka", crate::sinks::kafka::kafka_sink_factory)?;
315    registry.register_sink("sql", crate::sinks::sql::sql_sink_factory)?;
316    Ok(())
317}
318
319#[cfg(test)]
320mod tests {
321    use std::sync::{Arc, Mutex};
322
323    use anyhow::anyhow;
324    use async_trait::async_trait;
325    use serde_json::{Value, json};
326    use tokio::sync::mpsc::{Receiver, Sender};
327    use tokio_util::sync::CancellationToken;
328
329    use super::*;
330    use crate::config::{ErrorPolicyConfig, PipelineSpec, SinkSpec, SourceSpec, TransformSpec};
331    use crate::envelope::Envelope;
332    use crate::retry::RetryPolicy;
333
334    struct NoopSource(String);
335
336    #[async_trait]
337    impl Source for NoopSource {
338        fn id(&self) -> &str {
339            &self.0
340        }
341
342        async fn run(self: Box<Self>, _tx: Sender<Envelope>, _cancel: CancellationToken) {}
343    }
344
345    struct NoopTransform(String);
346
347    #[async_trait]
348    impl Transform for NoopTransform {
349        fn id(&self) -> &str {
350            &self.0
351        }
352
353        async fn run(
354            self: Box<Self>,
355            _rx: Receiver<Envelope>,
356            _tx: Sender<Envelope>,
357            _cancel: CancellationToken,
358        ) {
359        }
360    }
361
362    struct NoopSink(String);
363
364    #[async_trait]
365    impl Sink for NoopSink {
366        fn id(&self) -> &str {
367            &self.0
368        }
369
370        async fn run(self: Box<Self>, _rx: Receiver<Envelope>, _cancel: CancellationToken) {}
371    }
372
373    fn noop_source(id: &str, _: Value, _: Option<RetryPolicy>) -> Result<Box<dyn Source>> {
374        Ok(Box::new(NoopSource(id.to_string())))
375    }
376
377    fn noop_transform(id: &str, _: Value, _: ErrorPolicy) -> Result<Box<dyn Transform>> {
378        Ok(Box::new(NoopTransform(id.to_string())))
379    }
380
381    fn noop_sink(
382        id: &str,
383        _: Value,
384        _: ErrorPolicy,
385        _: Option<RetryPolicy>,
386    ) -> Result<Box<dyn Sink>> {
387        Ok(Box::new(NoopSink(id.to_string())))
388    }
389
390    /// Registry preloaded with `noop` source/transform/sink — used by
391    /// tests that care about pipeline wiring, not component behavior.
392    fn noop_registry() -> Registry {
393        let mut r = Registry::default();
394        r.register_source("noop", noop_source).unwrap();
395        r.register_transform("noop", noop_transform).unwrap();
396        r.register_sink("noop", noop_sink).unwrap();
397        r
398    }
399
400    fn noop_source_spec() -> SourceSpec {
401        SourceSpec {
402            kind: "noop".into(),
403            config: json!({}),
404            retry: None,
405        }
406    }
407
408    fn noop_transform_spec(on_error: Option<ErrorPolicyConfig>) -> TransformSpec {
409        TransformSpec {
410            kind: "noop".into(),
411            config: json!({}),
412            on_error,
413        }
414    }
415
416    fn noop_sink_spec(on_error: Option<ErrorPolicyConfig>) -> SinkSpec {
417        SinkSpec {
418            kind: "noop".into(),
419            config: json!({}),
420            on_error,
421            retry: None,
422        }
423    }
424
425    // -----------------------------------------------------------------
426    // Registration
427    // -----------------------------------------------------------------
428
429    #[test]
430    fn rejects_duplicate_source_registration() {
431        let mut registry = Registry::default();
432        registry.register_source("dup", noop_source).unwrap();
433        let err = registry.register_source("dup", noop_source).unwrap_err();
434        assert!(
435            err.to_string()
436                .contains("source factory 'dup' already registered")
437        );
438    }
439
440    #[test]
441    fn rejects_duplicate_transform_registration() {
442        let mut registry = Registry::default();
443        registry.register_transform("dup", noop_transform).unwrap();
444        let err = registry
445            .register_transform("dup", noop_transform)
446            .unwrap_err();
447        assert!(
448            err.to_string()
449                .contains("transform factory 'dup' already registered")
450        );
451    }
452
453    #[test]
454    fn rejects_duplicate_sink_registration() {
455        let mut registry = Registry::default();
456        registry.register_sink("dup", noop_sink).unwrap();
457        let err = registry.register_sink("dup", noop_sink).unwrap_err();
458        assert!(
459            err.to_string()
460                .contains("sink factory 'dup' already registered")
461        );
462    }
463
464    #[test]
465    fn same_kind_across_categories_does_not_collide() {
466        // "kafka" registered as both source and sink is supported — each
467        // category has its own namespace.
468        let mut registry = Registry::default();
469        registry.register_source("kafka", noop_source).unwrap();
470        registry.register_sink("kafka", noop_sink).unwrap();
471    }
472
473    // -----------------------------------------------------------------
474    // Lookup errors
475    // -----------------------------------------------------------------
476
477    #[test]
478    fn reports_unknown_source_type() {
479        let registry = Registry::default();
480        let err = registry
481            .build_source("p/src", noop_source_spec_with_kind("missing"))
482            .err()
483            .expect("expected unknown-kind error");
484        assert!(err.to_string().contains("unknown source type 'missing'"));
485    }
486
487    #[test]
488    fn reports_unknown_transform_type() {
489        let registry = Registry::default();
490        let err = registry
491            .build_transform(
492                "p/t0",
493                TransformSpec {
494                    kind: "missing".into(),
495                    config: json!({}),
496                    on_error: None,
497                },
498            )
499            .err()
500            .expect("expected unknown-kind error");
501        assert!(err.to_string().contains("unknown transform type 'missing'"));
502    }
503
504    #[test]
505    fn reports_unknown_sink_type() {
506        let registry = Registry::default();
507        let err = registry
508            .build_sink(
509                "p/sink0",
510                SinkSpec {
511                    kind: "missing".into(),
512                    config: json!({}),
513                    on_error: None,
514                    retry: None,
515                },
516            )
517            .err()
518            .expect("expected unknown-kind error");
519        assert!(err.to_string().contains("unknown sink type 'missing'"));
520    }
521
522    fn noop_source_spec_with_kind(kind: &str) -> SourceSpec {
523        SourceSpec {
524            kind: kind.into(),
525            config: json!({}),
526            retry: None,
527        }
528    }
529
530    // -----------------------------------------------------------------
531    // Built-ins
532    // -----------------------------------------------------------------
533
534    #[test]
535    fn with_builtins_registers_every_builtin_kind() {
536        let registry = Registry::with_builtins().unwrap();
537
538        let mut sources: Vec<_> = registry.source_kinds().collect();
539        sources.sort();
540        assert_eq!(
541            sources,
542            vec!["api_poll", "http_webhook", "kafka", "sql_query_poll"]
543        );
544
545        let mut transforms: Vec<_> = registry.transform_kinds().collect();
546        transforms.sort();
547        assert_eq!(
548            transforms,
549            vec!["batch", "filter", "mutate", "script", "set_key"]
550        );
551
552        let mut sinks: Vec<_> = registry.sink_kinds().collect();
553        sinks.sort();
554        assert_eq!(sinks, vec!["api", "file", "kafka", "sql"]);
555    }
556
557    #[test]
558    fn register_builtin_fails_on_second_call() {
559        let mut registry = Registry::default();
560        register_builtin(&mut registry).unwrap();
561        let err = register_builtin(&mut registry).unwrap_err();
562        assert!(err.to_string().contains("already registered"));
563    }
564
565    // Per-factory behavior (spec parsing, error messages) is tested next
566    // to each component — e.g. `transforms::set_key::tests`. The registry
567    // tests here only cover lookup and wiring concerns.
568
569    // -----------------------------------------------------------------
570    // build_courier
571    // -----------------------------------------------------------------
572
573    #[test]
574    fn build_courier_with_empty_config_yields_zero_pipelines() {
575        let registry = noop_registry();
576        let courier = registry.build_courier(Config::default()).unwrap();
577        // Nothing to assert beyond "no panic" — Courier has private fields.
578        // Spawning produces zero handles; run-time behavior tested elsewhere.
579        let handles = courier.spawn(CancellationToken::new());
580        assert!(handles.is_empty());
581    }
582
583    #[test]
584    fn build_courier_assigns_hierarchical_node_ids() {
585        // Factories record the id they receive so we can verify the scheme.
586        let recorded = Arc::new(Mutex::new(Vec::<String>::new()));
587
588        let mut registry = Registry::default();
589        let rec = recorded.clone();
590        registry
591            .register_source("rec", move |id: &str, _: Value, _: Option<RetryPolicy>| {
592                rec.lock().unwrap().push(id.to_string());
593                Ok(Box::new(NoopSource(id.into())) as Box<dyn Source>)
594            })
595            .unwrap();
596        let rec = recorded.clone();
597        registry
598            .register_transform("rec", move |id: &str, _: Value, _: ErrorPolicy| {
599                rec.lock().unwrap().push(id.to_string());
600                Ok(Box::new(NoopTransform(id.into())) as Box<dyn Transform>)
601            })
602            .unwrap();
603        let rec = recorded.clone();
604        registry
605            .register_sink(
606                "rec",
607                move |id: &str, _: Value, _: ErrorPolicy, _: Option<RetryPolicy>| {
608                    rec.lock().unwrap().push(id.to_string());
609                    Ok(Box::new(NoopSink(id.into())) as Box<dyn Sink>)
610                },
611            )
612            .unwrap();
613
614        registry
615            .build_courier(Config {
616                observability: None,
617                pipelines: vec![PipelineSpec {
618                    name: "my-pipeline".into(),
619                    source: SourceSpec {
620                        kind: "rec".into(),
621                        config: json!({}),
622                        retry: None,
623                    },
624                    transforms: vec![
625                        TransformSpec {
626                            kind: "rec".into(),
627                            config: json!({}),
628                            on_error: None,
629                        },
630                        TransformSpec {
631                            kind: "rec".into(),
632                            config: json!({}),
633                            on_error: None,
634                        },
635                    ],
636                    sinks: vec![
637                        SinkSpec {
638                            kind: "rec".into(),
639                            config: json!({}),
640                            on_error: None,
641                            retry: None,
642                        },
643                        SinkSpec {
644                            kind: "rec".into(),
645                            config: json!({}),
646                            on_error: None,
647                            retry: None,
648                        },
649                    ],
650                    channel_capacity: None,
651                }],
652            })
653            .unwrap();
654
655        let seen = recorded.lock().unwrap().clone();
656        assert_eq!(
657            seen,
658            vec![
659                "my-pipeline/src",
660                "my-pipeline/t0",
661                "my-pipeline/t1",
662                "my-pipeline/sink0",
663                "my-pipeline/sink1",
664            ],
665        );
666    }
667
668    #[test]
669    fn build_courier_wraps_component_errors_with_pipeline_name() {
670        let mut registry = Registry::default();
671        registry
672            .register_source("boom", |_: &str, _: Value, _: Option<RetryPolicy>| {
673                Err(anyhow!("source blew up"))
674            })
675            .unwrap();
676        registry.register_sink("noop", noop_sink).unwrap();
677
678        let err = registry
679            .build_courier(Config {
680                observability: None,
681                pipelines: vec![PipelineSpec {
682                    name: "analytics".into(),
683                    source: SourceSpec {
684                        kind: "boom".into(),
685                        config: json!({}),
686                        retry: None,
687                    },
688                    transforms: vec![],
689                    sinks: vec![noop_sink_spec(None)],
690                    channel_capacity: None,
691                }],
692            })
693            .err()
694            .expect("expected factory error to propagate");
695
696        let msg = format!("{err:#}");
697        assert!(msg.contains("pipeline 'analytics'"), "{msg}");
698        assert!(msg.contains("source 'boom'"), "{msg}");
699        assert!(msg.contains("source blew up"), "{msg}");
700    }
701
702    #[test]
703    fn propagates_on_error_to_factories() {
704        let mut registry = Registry::default();
705        registry.register_source("noop", noop_source).unwrap();
706
707        let seen_tx = Arc::new(Mutex::new(Vec::new()));
708        let seen = seen_tx.clone();
709        registry
710            .register_transform(
711                "tracking",
712                move |_: &str, _: Value, on_error: ErrorPolicy| {
713                    seen_tx.lock().unwrap().push(on_error);
714                    Ok(Box::new(NoopTransform("t".into())) as Box<dyn Transform>)
715                },
716            )
717            .unwrap();
718
719        let seen_sx = Arc::new(Mutex::new(Vec::new()));
720        let seen2 = seen_sx.clone();
721        registry
722            .register_sink(
723                "tracking",
724                move |_: &str, _: Value, on_error: ErrorPolicy, _: Option<RetryPolicy>| {
725                    seen_sx.lock().unwrap().push(on_error);
726                    Ok(Box::new(NoopSink("s".into())) as Box<dyn Sink>)
727                },
728            )
729            .unwrap();
730
731        registry
732            .build_courier(Config {
733                observability: None,
734                pipelines: vec![PipelineSpec {
735                    name: "p".into(),
736                    source: noop_source_spec(),
737                    transforms: vec![
738                        TransformSpec {
739                            kind: "tracking".into(),
740                            config: json!({}),
741                            on_error: Some(ErrorPolicyConfig::FailPipeline),
742                        },
743                        TransformSpec {
744                            kind: "tracking".into(),
745                            config: json!({}),
746                            on_error: None, // defaults to Drop
747                        },
748                    ],
749                    sinks: vec![SinkSpec {
750                        kind: "tracking".into(),
751                        config: json!({}),
752                        on_error: None,
753                        retry: None,
754                    }],
755                    channel_capacity: Some(32),
756                }],
757            })
758            .unwrap();
759
760        assert_eq!(
761            *seen.lock().unwrap(),
762            vec![ErrorPolicy::FailPipeline, ErrorPolicy::Drop],
763        );
764        assert_eq!(*seen2.lock().unwrap(), vec![ErrorPolicy::Drop]);
765    }
766
767    #[test]
768    fn ignored_channel_capacity_falls_back_to_pipeline_default() {
769        // Smoke test: build_courier succeeds with capacity=None. The
770        // default is enforced inside Pipeline::new; we just verify that
771        // omission doesn't cause trouble.
772        let registry = noop_registry();
773        registry
774            .build_courier(Config {
775                observability: None,
776                pipelines: vec![PipelineSpec {
777                    name: "p".into(),
778                    source: noop_source_spec(),
779                    transforms: vec![noop_transform_spec(None)],
780                    sinks: vec![noop_sink_spec(None)],
781                    channel_capacity: None,
782                }],
783            })
784            .unwrap();
785    }
786}