1use crate::config::{ConsumerConfig, ProducerConfig, SchemaConfig, SubscriptionType};
2
3#[derive(Debug, Clone)]
5pub struct RouteSchemaPolicy {
6 pub expected_subject: Option<String>,
8 pub output_schema: Option<SchemaConfig>,
10}
11
12impl RouteSchemaPolicy {
13 pub fn none() -> Self {
15 Self {
16 expected_subject: None,
17 output_schema: None,
18 }
19 }
20}
21
22#[cfg(test)]
23mod tests {
24 use super::*;
25 use std::path::PathBuf;
26
27 #[test]
28 fn test_sink_route_roundtrip() {
29 let config = ConsumerConfig {
30 topic: "/default/events".to_string(),
31 consumer_name: "events-consumer".to_string(),
32 subscription: "events-sub".to_string(),
33 subscription_type: SubscriptionType::Shared,
34 expected_schema_subject: Some("events-value".to_string()),
35 };
36
37 let route = config.route();
38 assert_eq!(route.topic, "/default/events");
39 assert_eq!(route.subscription.consumer_name, "events-consumer");
40 assert_eq!(route.subscription.subscription, "events-sub");
41 assert!(matches!(
42 route.subscription.subscription_type,
43 SubscriptionType::Shared
44 ));
45 assert_eq!(
46 route.schema.expected_subject.as_deref(),
47 Some("events-value")
48 );
49 assert!(route.schema.output_schema.is_none());
50
51 let roundtrip = ConsumerConfig::from_route(route);
52 assert_eq!(roundtrip.topic, config.topic);
53 assert_eq!(roundtrip.consumer_name, config.consumer_name);
54 assert_eq!(roundtrip.subscription, config.subscription);
55 assert!(matches!(
56 roundtrip.subscription_type,
57 SubscriptionType::Shared
58 ));
59 assert_eq!(
60 roundtrip.expected_schema_subject,
61 Some("events-value".to_string())
62 );
63 }
64
65 #[test]
66 fn test_source_route_roundtrip() {
67 let config = ProducerConfig {
68 topic: "/default/output".to_string(),
69 partitions: 4,
70 reliable_dispatch: true,
71 schema_config: Some(SchemaConfig {
72 subject: "output-value".to_string(),
73 schema_type: "json_schema".to_string(),
74 schema_file: PathBuf::from("schemas/output.json"),
75 auto_register: true,
76 version_strategy: crate::VersionStrategy::Pinned(2),
77 }),
78 };
79
80 let route = config.route();
81 assert_eq!(route.topic, "/default/output");
82 assert_eq!(route.dispatch.partitions, 4);
83 assert!(route.dispatch.reliable_dispatch);
84 assert!(route.schema.expected_subject.is_none());
85 assert!(route.schema.output_schema.is_some());
86
87 let roundtrip = ProducerConfig::from_route(route);
88 assert_eq!(roundtrip.topic, config.topic);
89 assert_eq!(roundtrip.partitions, config.partitions);
90 assert_eq!(roundtrip.reliable_dispatch, config.reliable_dispatch);
91 assert!(roundtrip.schema_config.is_some());
92 assert_eq!(
93 roundtrip
94 .schema_config
95 .as_ref()
96 .map(|schema| schema.subject.as_str()),
97 Some("output-value")
98 );
99 }
100}
101
102impl From<ConsumerConfig> for SinkRoute {
103 fn from(config: ConsumerConfig) -> Self {
104 Self {
105 topic: config.topic,
106 subscription: RouteSubscriptionPolicy {
107 consumer_name: config.consumer_name,
108 subscription: config.subscription,
109 subscription_type: config.subscription_type,
110 },
111 schema: RouteSchemaPolicy {
112 expected_subject: config.expected_schema_subject,
113 output_schema: None,
114 },
115 }
116 }
117}
118
119impl From<SinkRoute> for ConsumerConfig {
120 fn from(route: SinkRoute) -> Self {
121 Self {
122 topic: route.topic,
123 consumer_name: route.subscription.consumer_name,
124 subscription: route.subscription.subscription,
125 subscription_type: route.subscription.subscription_type,
126 expected_schema_subject: route.schema.expected_subject,
127 }
128 }
129}
130
131impl From<ProducerConfig> for SourceRoute {
132 fn from(config: ProducerConfig) -> Self {
133 Self {
134 topic: config.topic,
135 dispatch: RouteDispatchPolicy {
136 partitions: config.partitions,
137 reliable_dispatch: config.reliable_dispatch,
138 },
139 schema: RouteSchemaPolicy {
140 expected_subject: None,
141 output_schema: config.schema_config,
142 },
143 }
144 }
145}
146
147impl From<SourceRoute> for ProducerConfig {
148 fn from(route: SourceRoute) -> Self {
149 Self {
150 topic: route.topic,
151 partitions: route.dispatch.partitions,
152 reliable_dispatch: route.dispatch.reliable_dispatch,
153 schema_config: route.schema.output_schema,
154 }
155 }
156}
157
158#[derive(Debug, Clone)]
160pub struct RouteSubscriptionPolicy {
161 pub consumer_name: String,
163 pub subscription: String,
165 pub subscription_type: SubscriptionType,
167}
168
169impl RouteSubscriptionPolicy {
170 pub fn new(
172 consumer_name: impl Into<String>,
173 subscription: impl Into<String>,
174 subscription_type: SubscriptionType,
175 ) -> Self {
176 Self {
177 consumer_name: consumer_name.into(),
178 subscription: subscription.into(),
179 subscription_type,
180 }
181 }
182}
183
184#[derive(Debug, Clone)]
186pub struct RouteDispatchPolicy {
187 pub partitions: usize,
189 pub reliable_dispatch: bool,
191}
192
193impl RouteDispatchPolicy {
194 pub fn new(partitions: usize, reliable_dispatch: bool) -> Self {
196 Self {
197 partitions,
198 reliable_dispatch,
199 }
200 }
201}
202
203#[derive(Debug, Clone)]
205pub struct SinkRoute {
206 pub topic: String,
208 pub subscription: RouteSubscriptionPolicy,
210 pub schema: RouteSchemaPolicy,
212}
213
214impl SinkRoute {
215 pub fn new(
217 topic: impl Into<String>,
218 subscription: RouteSubscriptionPolicy,
219 schema: RouteSchemaPolicy,
220 ) -> Self {
221 Self {
222 topic: topic.into(),
223 subscription,
224 schema,
225 }
226 }
227}
228
229#[derive(Debug, Clone)]
231pub struct SourceRoute {
232 pub topic: String,
234 pub dispatch: RouteDispatchPolicy,
236 pub schema: RouteSchemaPolicy,
238}
239
240impl SourceRoute {
241 pub fn new(
243 topic: impl Into<String>,
244 dispatch: RouteDispatchPolicy,
245 schema: RouteSchemaPolicy,
246 ) -> Self {
247 Self {
248 topic: topic.into(),
249 dispatch,
250 schema,
251 }
252 }
253}