Skip to main content

danube_connect_core/
route.rs

1use crate::config::{ConsumerConfig, ProducerConfig, SchemaConfig, SubscriptionType};
2
3/// Schema-related routing policy shared by sink and source routes.
4#[derive(Debug, Clone)]
5pub struct RouteSchemaPolicy {
6    /// Expected input schema subject for sink-side validation.
7    pub expected_subject: Option<String>,
8    /// Output schema configuration used when producing records.
9    pub output_schema: Option<SchemaConfig>,
10}
11
12impl RouteSchemaPolicy {
13    /// Create a schema policy with no expected subject and no output schema.
14    pub fn none() -> Self {
15        Self {
16            expected_subject: None,
17            output_schema: None,
18        }
19    }
20}
21
22#[cfg(test)]
23mod tests {
24    use super::*;
25    use std::path::PathBuf;
26
27    #[test]
28    fn test_sink_route_roundtrip() {
29        let config = ConsumerConfig {
30            topic: "/default/events".to_string(),
31            consumer_name: "events-consumer".to_string(),
32            subscription: "events-sub".to_string(),
33            subscription_type: SubscriptionType::Shared,
34            expected_schema_subject: Some("events-value".to_string()),
35        };
36
37        let route = config.route();
38        assert_eq!(route.topic, "/default/events");
39        assert_eq!(route.subscription.consumer_name, "events-consumer");
40        assert_eq!(route.subscription.subscription, "events-sub");
41        assert!(matches!(
42            route.subscription.subscription_type,
43            SubscriptionType::Shared
44        ));
45        assert_eq!(
46            route.schema.expected_subject.as_deref(),
47            Some("events-value")
48        );
49        assert!(route.schema.output_schema.is_none());
50
51        let roundtrip = ConsumerConfig::from_route(route);
52        assert_eq!(roundtrip.topic, config.topic);
53        assert_eq!(roundtrip.consumer_name, config.consumer_name);
54        assert_eq!(roundtrip.subscription, config.subscription);
55        assert!(matches!(
56            roundtrip.subscription_type,
57            SubscriptionType::Shared
58        ));
59        assert_eq!(
60            roundtrip.expected_schema_subject,
61            Some("events-value".to_string())
62        );
63    }
64
65    #[test]
66    fn test_source_route_roundtrip() {
67        let config = ProducerConfig {
68            topic: "/default/output".to_string(),
69            partitions: 4,
70            reliable_dispatch: true,
71            schema_config: Some(SchemaConfig {
72                subject: "output-value".to_string(),
73                schema_type: "json_schema".to_string(),
74                schema_file: PathBuf::from("schemas/output.json"),
75                auto_register: true,
76                version_strategy: crate::VersionStrategy::Pinned(2),
77            }),
78        };
79
80        let route = config.route();
81        assert_eq!(route.topic, "/default/output");
82        assert_eq!(route.dispatch.partitions, 4);
83        assert!(route.dispatch.reliable_dispatch);
84        assert!(route.schema.expected_subject.is_none());
85        assert!(route.schema.output_schema.is_some());
86
87        let roundtrip = ProducerConfig::from_route(route);
88        assert_eq!(roundtrip.topic, config.topic);
89        assert_eq!(roundtrip.partitions, config.partitions);
90        assert_eq!(roundtrip.reliable_dispatch, config.reliable_dispatch);
91        assert!(roundtrip.schema_config.is_some());
92        assert_eq!(
93            roundtrip
94                .schema_config
95                .as_ref()
96                .map(|schema| schema.subject.as_str()),
97            Some("output-value")
98        );
99    }
100}
101
102impl From<ConsumerConfig> for SinkRoute {
103    fn from(config: ConsumerConfig) -> Self {
104        Self {
105            topic: config.topic,
106            subscription: RouteSubscriptionPolicy {
107                consumer_name: config.consumer_name,
108                subscription: config.subscription,
109                subscription_type: config.subscription_type,
110            },
111            schema: RouteSchemaPolicy {
112                expected_subject: config.expected_schema_subject,
113                output_schema: None,
114            },
115        }
116    }
117}
118
119impl From<SinkRoute> for ConsumerConfig {
120    fn from(route: SinkRoute) -> Self {
121        Self {
122            topic: route.topic,
123            consumer_name: route.subscription.consumer_name,
124            subscription: route.subscription.subscription,
125            subscription_type: route.subscription.subscription_type,
126            expected_schema_subject: route.schema.expected_subject,
127        }
128    }
129}
130
131impl From<ProducerConfig> for SourceRoute {
132    fn from(config: ProducerConfig) -> Self {
133        Self {
134            topic: config.topic,
135            dispatch: RouteDispatchPolicy {
136                partitions: config.partitions,
137                reliable_dispatch: config.reliable_dispatch,
138            },
139            schema: RouteSchemaPolicy {
140                expected_subject: None,
141                output_schema: config.schema_config,
142            },
143        }
144    }
145}
146
147impl From<SourceRoute> for ProducerConfig {
148    fn from(route: SourceRoute) -> Self {
149        Self {
150            topic: route.topic,
151            partitions: route.dispatch.partitions,
152            reliable_dispatch: route.dispatch.reliable_dispatch,
153            schema_config: route.schema.output_schema,
154        }
155    }
156}
157
158/// Subscription behavior for a sink route.
159#[derive(Debug, Clone)]
160pub struct RouteSubscriptionPolicy {
161    /// Consumer name used when creating the Danube consumer.
162    pub consumer_name: String,
163    /// Subscription name shared by consumers of the same route.
164    pub subscription: String,
165    /// Danube subscription type for this route.
166    pub subscription_type: SubscriptionType,
167}
168
169impl RouteSubscriptionPolicy {
170    /// Create a subscription policy from explicit consumer settings.
171    pub fn new(
172        consumer_name: impl Into<String>,
173        subscription: impl Into<String>,
174        subscription_type: SubscriptionType,
175    ) -> Self {
176        Self {
177            consumer_name: consumer_name.into(),
178            subscription: subscription.into(),
179            subscription_type,
180        }
181    }
182}
183
184/// Dispatch behavior for a source route.
185#[derive(Debug, Clone)]
186pub struct RouteDispatchPolicy {
187    /// Number of partitions to configure for the destination topic.
188    pub partitions: usize,
189    /// Whether the route should use reliable dispatch semantics.
190    pub reliable_dispatch: bool,
191}
192
193impl RouteDispatchPolicy {
194    /// Create a dispatch policy for a source route.
195    pub fn new(partitions: usize, reliable_dispatch: bool) -> Self {
196        Self {
197            partitions,
198            reliable_dispatch,
199        }
200    }
201}
202
203/// Complete sink-side routing definition derived from `ConsumerConfig`.
204#[derive(Debug, Clone)]
205pub struct SinkRoute {
206    /// Danube topic consumed by this route.
207    pub topic: String,
208    /// Subscription policy used to create the consumer.
209    pub subscription: RouteSubscriptionPolicy,
210    /// Schema expectations applied to consumed records.
211    pub schema: RouteSchemaPolicy,
212}
213
214impl SinkRoute {
215    /// Create a sink route from its topic, subscription, and schema policy.
216    pub fn new(
217        topic: impl Into<String>,
218        subscription: RouteSubscriptionPolicy,
219        schema: RouteSchemaPolicy,
220    ) -> Self {
221        Self {
222            topic: topic.into(),
223            subscription,
224            schema,
225        }
226    }
227}
228
229/// Complete source-side routing definition derived from `ProducerConfig`.
230#[derive(Debug, Clone)]
231pub struct SourceRoute {
232    /// Danube topic produced by this route.
233    pub topic: String,
234    /// Dispatch policy used when creating the producer.
235    pub dispatch: RouteDispatchPolicy,
236    /// Schema configuration applied when publishing records.
237    pub schema: RouteSchemaPolicy,
238}
239
240impl SourceRoute {
241    /// Create a source route from its topic, dispatch, and schema policy.
242    pub fn new(
243        topic: impl Into<String>,
244        dispatch: RouteDispatchPolicy,
245        schema: RouteSchemaPolicy,
246    ) -> Self {
247        Self {
248            topic: topic.into(),
249            dispatch,
250            schema,
251        }
252    }
253}