1use switchback_codec_pb::canardleteer::switchback::protocol::amqp::v1alpha1::AmqpPayload;
4use switchback_codec_pb::canardleteer::switchback::protocol::amqp::v1alpha1::__buffa::oneof::amqp_payload::Kind as AmqpKind;
5use switchback_codec_pb::canardleteer::switchback::protocol::grpc::v1alpha1::GrpcPayload;
6use switchback_codec_pb::canardleteer::switchback::protocol::grpc::v1alpha1::__buffa::oneof::grpc_payload::Kind as GrpcKind;
7use switchback_codec_pb::canardleteer::switchback::protocol::http::v1alpha1::HttpPayload;
8use switchback_codec_pb::canardleteer::switchback::protocol::http::v1alpha1::__buffa::oneof::http_payload::Kind as HttpKind;
9use switchback_codec_pb::canardleteer::switchback::protocol::kafka::v1alpha1::KafkaPayload;
10use switchback_codec_pb::canardleteer::switchback::protocol::kafka::v1alpha1::__buffa::oneof::kafka_payload::Kind as KafkaKind;
11use switchback_codec_pb::canardleteer::switchback::protocol::mqtt::v1alpha1::MqttPayload;
12use switchback_codec_pb::canardleteer::switchback::protocol::mqtt::v1alpha1::__buffa::oneof::mqtt_payload::Kind as MqttKind;
13use switchback_traits::{ProtocolAttachment, Result, SwitchbackError};
14
15use crate::amqp::AmqpProtocol;
16use crate::grpc::GrpcProtocol;
17use crate::http::HttpProtocol;
18use crate::kafka::KafkaProtocol;
19use crate::mqtt::MqttProtocol;
20use crate::wire::decode_message;
21
22#[derive(Clone, Debug, PartialEq)]
24pub enum HttpPayloadKind {
25 Contract(
27 switchback_codec_pb::canardleteer::switchback::protocol::http::v1alpha1::HttpContractMeta,
28 ),
29 Operation(
31 switchback_codec_pb::canardleteer::switchback::protocol::http::v1alpha1::HttpOperationMeta,
32 ),
33 Response(
35 switchback_codec_pb::canardleteer::switchback::protocol::http::v1alpha1::HttpResponseMeta,
36 ),
37 Error(switchback_codec_pb::canardleteer::switchback::protocol::http::v1alpha1::HttpErrorMeta),
39 Parameter(
41 switchback_codec_pb::canardleteer::switchback::protocol::http::v1alpha1::HttpParameterMeta,
42 ),
43}
44
45#[derive(Clone, Debug, PartialEq)]
47pub enum GrpcPayloadKind {
48 Contract(
50 switchback_codec_pb::canardleteer::switchback::protocol::grpc::v1alpha1::GrpcContractMeta,
51 ),
52 Operation(
54 switchback_codec_pb::canardleteer::switchback::protocol::grpc::v1alpha1::GrpcOperationMeta,
55 ),
56 Status(switchback_codec_pb::canardleteer::switchback::protocol::grpc::v1alpha1::GrpcStatusMeta),
58 Error(switchback_codec_pb::canardleteer::switchback::protocol::grpc::v1alpha1::GrpcErrorMeta),
60 Metadata(
62 switchback_codec_pb::canardleteer::switchback::protocol::grpc::v1alpha1::GrpcMetadataMeta,
63 ),
64}
65
66#[derive(Clone, Debug, PartialEq)]
68pub enum KafkaPayloadKind {
69 Contract(
71 switchback_codec_pb::canardleteer::switchback::protocol::kafka::v1alpha1::KafkaContractMeta,
72 ),
73 Channel(
75 switchback_codec_pb::canardleteer::switchback::protocol::kafka::v1alpha1::KafkaChannelMeta,
76 ),
77 Operation(
79 switchback_codec_pb::canardleteer::switchback::protocol::kafka::v1alpha1::KafkaOperationMeta,
80 ),
81 Message(
83 switchback_codec_pb::canardleteer::switchback::protocol::kafka::v1alpha1::KafkaMessageMeta,
84 ),
85}
86
87#[derive(Clone, Debug, PartialEq)]
89pub enum AmqpPayloadKind {
90 Contract(
92 switchback_codec_pb::canardleteer::switchback::protocol::amqp::v1alpha1::AmqpContractMeta,
93 ),
94 Channel(
96 switchback_codec_pb::canardleteer::switchback::protocol::amqp::v1alpha1::AmqpChannelMeta,
97 ),
98 Operation(
100 switchback_codec_pb::canardleteer::switchback::protocol::amqp::v1alpha1::AmqpOperationMeta,
101 ),
102 Message(
104 switchback_codec_pb::canardleteer::switchback::protocol::amqp::v1alpha1::AmqpMessageMeta,
105 ),
106}
107
108#[derive(Clone, Debug, PartialEq)]
110pub enum MqttPayloadKind {
111 Contract(
113 switchback_codec_pb::canardleteer::switchback::protocol::mqtt::v1alpha1::MqttContractMeta,
114 ),
115 Channel(
117 switchback_codec_pb::canardleteer::switchback::protocol::mqtt::v1alpha1::MqttChannelMeta,
118 ),
119 Operation(
121 switchback_codec_pb::canardleteer::switchback::protocol::mqtt::v1alpha1::MqttOperationMeta,
122 ),
123 Message(
125 switchback_codec_pb::canardleteer::switchback::protocol::mqtt::v1alpha1::MqttMessageMeta,
126 ),
127}
128
129#[derive(Clone, Debug, PartialEq)]
131pub enum DecodedAttachment {
132 Http(HttpPayloadKind),
134 Grpc(GrpcPayloadKind),
136 Kafka(KafkaPayloadKind),
138 Amqp(AmqpPayloadKind),
140 Mqtt(MqttPayloadKind),
142 Opaque {
144 protocol_id: String,
146 payload: Vec<u8>,
148 },
149}
150
151#[derive(Clone, Debug, Default)]
153pub struct ProtocolRegistry {
154 http: HttpProtocol,
155 grpc: GrpcProtocol,
156 kafka: KafkaProtocol,
157 amqp: AmqpProtocol,
158 mqtt: MqttProtocol,
159}
160
161impl ProtocolRegistry {
162 pub fn with_builtins() -> Self {
164 Self::default()
165 }
166
167 pub fn http(&self) -> &HttpProtocol {
169 &self.http
170 }
171
172 pub fn grpc(&self) -> &GrpcProtocol {
174 &self.grpc
175 }
176
177 pub fn kafka(&self) -> &KafkaProtocol {
179 &self.kafka
180 }
181
182 pub fn amqp(&self) -> &AmqpProtocol {
184 &self.amqp
185 }
186
187 pub fn mqtt(&self) -> &MqttProtocol {
189 &self.mqtt
190 }
191
192 pub fn decode_attachment(&self, attachment: &ProtocolAttachment) -> Result<DecodedAttachment> {
197 match attachment.protocol_id.as_str() {
198 "http" => decode_http(&attachment.payload).map(DecodedAttachment::Http),
199 "grpc" => decode_grpc(&attachment.payload).map(DecodedAttachment::Grpc),
200 "kafka" => decode_kafka(&attachment.payload).map(DecodedAttachment::Kafka),
201 "amqp" => decode_amqp(&attachment.payload).map(DecodedAttachment::Amqp),
202 "mqtt" => decode_mqtt(&attachment.payload).map(DecodedAttachment::Mqtt),
203 other => Ok(DecodedAttachment::Opaque {
204 protocol_id: other.to_string(),
205 payload: attachment.payload.clone(),
206 }),
207 }
208 }
209
210 pub fn http_operation_from_attachments(
212 &self,
213 protocols: &[ProtocolAttachment],
214 ) -> Option<
215 switchback_codec_pb::canardleteer::switchback::protocol::http::v1alpha1::HttpOperationMeta,
216 > {
217 for attachment in protocols {
218 if let Ok(DecodedAttachment::Http(HttpPayloadKind::Operation(meta))) =
219 self.decode_attachment(attachment)
220 {
221 return Some(meta);
222 }
223 }
224 None
225 }
226
227 pub fn grpc_operation_from_attachments(
229 &self,
230 protocols: &[ProtocolAttachment],
231 ) -> Option<
232 switchback_codec_pb::canardleteer::switchback::protocol::grpc::v1alpha1::GrpcOperationMeta,
233 > {
234 for attachment in protocols {
235 if let Ok(DecodedAttachment::Grpc(GrpcPayloadKind::Operation(meta))) =
236 self.decode_attachment(attachment)
237 {
238 return Some(meta);
239 }
240 }
241 None
242 }
243}
244
245fn decode_http(bytes: &[u8]) -> Result<HttpPayloadKind> {
246 let payload: HttpPayload = decode_message(bytes)?;
247 match payload.kind {
248 Some(HttpKind::Contract(v)) => Ok(HttpPayloadKind::Contract(*v)),
249 Some(HttpKind::Operation(v)) => Ok(HttpPayloadKind::Operation(*v)),
250 Some(HttpKind::Response(v)) => Ok(HttpPayloadKind::Response(*v)),
251 Some(HttpKind::Error(v)) => Ok(HttpPayloadKind::Error(*v)),
252 Some(HttpKind::Parameter(v)) => Ok(HttpPayloadKind::Parameter(*v)),
253 None => Err(SwitchbackError::codec("empty HttpPayload")),
254 }
255}
256
257fn decode_grpc(bytes: &[u8]) -> Result<GrpcPayloadKind> {
258 let payload: GrpcPayload = decode_message(bytes)?;
259 match payload.kind {
260 Some(GrpcKind::Contract(v)) => Ok(GrpcPayloadKind::Contract(*v)),
261 Some(GrpcKind::Operation(v)) => Ok(GrpcPayloadKind::Operation(*v)),
262 Some(GrpcKind::Status(v)) => Ok(GrpcPayloadKind::Status(*v)),
263 Some(GrpcKind::Error(v)) => Ok(GrpcPayloadKind::Error(*v)),
264 Some(GrpcKind::Metadata(v)) => Ok(GrpcPayloadKind::Metadata(*v)),
265 None => Err(SwitchbackError::codec("empty GrpcPayload")),
266 }
267}
268
269fn decode_kafka(bytes: &[u8]) -> Result<KafkaPayloadKind> {
270 let payload: KafkaPayload = decode_message(bytes)?;
271 match payload.kind {
272 Some(KafkaKind::Contract(v)) => Ok(KafkaPayloadKind::Contract(*v)),
273 Some(KafkaKind::Channel(v)) => Ok(KafkaPayloadKind::Channel(*v)),
274 Some(KafkaKind::Operation(v)) => Ok(KafkaPayloadKind::Operation(*v)),
275 Some(KafkaKind::Message(v)) => Ok(KafkaPayloadKind::Message(*v)),
276 None => Err(SwitchbackError::codec("empty KafkaPayload")),
277 }
278}
279
280fn decode_amqp(bytes: &[u8]) -> Result<AmqpPayloadKind> {
281 let payload: AmqpPayload = decode_message(bytes)?;
282 match payload.kind {
283 Some(AmqpKind::Contract(v)) => Ok(AmqpPayloadKind::Contract(*v)),
284 Some(AmqpKind::Channel(v)) => Ok(AmqpPayloadKind::Channel(*v)),
285 Some(AmqpKind::Operation(v)) => Ok(AmqpPayloadKind::Operation(*v)),
286 Some(AmqpKind::Message(v)) => Ok(AmqpPayloadKind::Message(*v)),
287 None => Err(SwitchbackError::codec("empty AmqpPayload")),
288 }
289}
290
291fn decode_mqtt(bytes: &[u8]) -> Result<MqttPayloadKind> {
292 let payload: MqttPayload = decode_message(bytes)?;
293 match payload.kind {
294 Some(MqttKind::Contract(v)) => Ok(MqttPayloadKind::Contract(*v)),
295 Some(MqttKind::Channel(v)) => Ok(MqttPayloadKind::Channel(*v)),
296 Some(MqttKind::Operation(v)) => Ok(MqttPayloadKind::Operation(*v)),
297 Some(MqttKind::Message(v)) => Ok(MqttPayloadKind::Message(*v)),
298 None => Err(SwitchbackError::codec("empty MqttPayload")),
299 }
300}
301
302#[cfg(test)]
303mod coverage_matrix {
304 use super::*;
305 use switchback_codec_pb::canardleteer::switchback::protocol::amqp::v1alpha1::{
306 AmqpChannelMeta, AmqpContractMeta, AmqpMessageMeta, AmqpOperationMeta,
307 };
308 use switchback_codec_pb::canardleteer::switchback::protocol::grpc::v1alpha1::{
309 GrpcContractMeta, GrpcErrorMeta, GrpcMetadataMeta, GrpcOperationMeta, GrpcStatusMeta,
310 };
311 use switchback_codec_pb::canardleteer::switchback::protocol::http::v1alpha1::{
312 HttpContractMeta, HttpErrorMeta, HttpOperationMeta, HttpParameterMeta, HttpResponseMeta,
313 };
314 use switchback_codec_pb::canardleteer::switchback::protocol::kafka::v1alpha1::{
315 KafkaChannelMeta, KafkaContractMeta, KafkaMessageMeta, KafkaOperationMeta,
316 };
317 use switchback_codec_pb::canardleteer::switchback::protocol::mqtt::v1alpha1::{
318 MqttChannelMeta, MqttContractMeta, MqttMessageMeta, MqttOperationMeta,
319 };
320
321 #[test]
322 fn http_matrix_roundtrips() {
323 let registry = ProtocolRegistry::with_builtins();
324 let http = registry.http();
325
326 let cases: Vec<(HttpPayloadKind, ProtocolAttachment)> = vec![
327 (
328 HttpPayloadKind::Contract(HttpContractMeta {
329 default_server_url: "https://api.example.com".into(),
330 ..Default::default()
331 }),
332 http.attach_contract(&HttpContractMeta {
333 default_server_url: "https://api.example.com".into(),
334 ..Default::default()
335 }),
336 ),
337 (
338 HttpPayloadKind::Operation(HttpOperationMeta {
339 method: "GET".into(),
340 path_template: "/pets".into(),
341 ..Default::default()
342 }),
343 http.attach_operation(&HttpOperationMeta {
344 method: "GET".into(),
345 path_template: "/pets".into(),
346 ..Default::default()
347 }),
348 ),
349 (
350 HttpPayloadKind::Response(HttpResponseMeta {
351 status_code: 200,
352 ..Default::default()
353 }),
354 http.attach_response(&HttpResponseMeta {
355 status_code: 200,
356 ..Default::default()
357 }),
358 ),
359 (
360 HttpPayloadKind::Error(HttpErrorMeta {
361 status_code: 404,
362 ..Default::default()
363 }),
364 http.attach_error(&HttpErrorMeta {
365 status_code: 404,
366 ..Default::default()
367 }),
368 ),
369 (
370 HttpPayloadKind::Parameter(HttpParameterMeta {
371 name: "id".into(),
372 location: "path".into(),
373 required: true,
374 ..Default::default()
375 }),
376 http.attach_parameter(&HttpParameterMeta {
377 name: "id".into(),
378 location: "path".into(),
379 required: true,
380 ..Default::default()
381 }),
382 ),
383 ];
384
385 for (expected_kind, attachment) in cases {
386 match registry.decode_attachment(&attachment).unwrap() {
387 DecodedAttachment::Http(kind) => assert_eq!(kind, expected_kind),
388 other => panic!("expected http decode, got {other:?}"),
389 }
390 }
391 }
392
393 #[test]
394 fn grpc_matrix_roundtrips() {
395 let registry = ProtocolRegistry::with_builtins();
396 let grpc = registry.grpc();
397
398 let cases: Vec<(GrpcPayloadKind, ProtocolAttachment)> = vec![
399 (
400 GrpcPayloadKind::Contract(GrpcContractMeta {
401 package_name: "acme.v1".into(),
402 ..Default::default()
403 }),
404 grpc.attach_contract(&GrpcContractMeta {
405 package_name: "acme.v1".into(),
406 ..Default::default()
407 }),
408 ),
409 (
410 GrpcPayloadKind::Operation(GrpcOperationMeta {
411 rpc_name: "GetPet".into(),
412 ..Default::default()
413 }),
414 grpc.attach_operation(&GrpcOperationMeta {
415 rpc_name: "GetPet".into(),
416 ..Default::default()
417 }),
418 ),
419 (
420 GrpcPayloadKind::Status(GrpcStatusMeta {
421 code: 0,
422 message: "OK".into(),
423 ..Default::default()
424 }),
425 grpc.attach_status(&GrpcStatusMeta {
426 code: 0,
427 message: "OK".into(),
428 ..Default::default()
429 }),
430 ),
431 (
432 GrpcPayloadKind::Error(GrpcErrorMeta {
433 code: 5,
434 message: "not found".into(),
435 ..Default::default()
436 }),
437 grpc.attach_error(&GrpcErrorMeta {
438 code: 5,
439 message: "not found".into(),
440 ..Default::default()
441 }),
442 ),
443 (
444 GrpcPayloadKind::Metadata(GrpcMetadataMeta {
445 key: "x-request-id".into(),
446 required: false,
447 ..Default::default()
448 }),
449 grpc.attach_metadata(&GrpcMetadataMeta {
450 key: "x-request-id".into(),
451 required: false,
452 ..Default::default()
453 }),
454 ),
455 ];
456
457 for (expected_kind, attachment) in cases {
458 match registry.decode_attachment(&attachment).unwrap() {
459 DecodedAttachment::Grpc(kind) => assert_eq!(kind, expected_kind),
460 other => panic!("expected grpc decode, got {other:?}"),
461 }
462 }
463 }
464
465 #[test]
466 fn kafka_matrix_roundtrips() {
467 let registry = ProtocolRegistry::with_builtins();
468 let kafka = registry.kafka();
469
470 let cases: Vec<(KafkaPayloadKind, ProtocolAttachment)> = vec![
471 (
472 KafkaPayloadKind::Contract(KafkaContractMeta {
473 bootstrap_servers: vec!["kafka:9092".into()],
474 ..Default::default()
475 }),
476 kafka.attach_contract(&KafkaContractMeta {
477 bootstrap_servers: vec!["kafka:9092".into()],
478 ..Default::default()
479 }),
480 ),
481 (
482 KafkaPayloadKind::Channel(KafkaChannelMeta {
483 topic: "orders".into(),
484 partitions: 12,
485 replicas: 3,
486 ..Default::default()
487 }),
488 kafka.attach_channel(&KafkaChannelMeta {
489 topic: "orders".into(),
490 partitions: 12,
491 replicas: 3,
492 ..Default::default()
493 }),
494 ),
495 (
496 KafkaPayloadKind::Operation(KafkaOperationMeta {
497 group_id: "my-group".into(),
498 client_id: "my-client".into(),
499 ..Default::default()
500 }),
501 kafka.attach_operation(&KafkaOperationMeta {
502 group_id: "my-group".into(),
503 client_id: "my-client".into(),
504 ..Default::default()
505 }),
506 ),
507 (
508 KafkaPayloadKind::Message(KafkaMessageMeta {
509 schema_id_location: "payload".into(),
510 ..Default::default()
511 }),
512 kafka.attach_message(&KafkaMessageMeta {
513 schema_id_location: "payload".into(),
514 ..Default::default()
515 }),
516 ),
517 ];
518
519 for (expected_kind, attachment) in cases {
520 match registry.decode_attachment(&attachment).unwrap() {
521 DecodedAttachment::Kafka(kind) => assert_eq!(kind, expected_kind),
522 other => panic!("expected kafka decode, got {other:?}"),
523 }
524 }
525 }
526
527 #[test]
528 fn amqp_matrix_roundtrips() {
529 let registry = ProtocolRegistry::with_builtins();
530 let amqp = registry.amqp();
531
532 let cases: Vec<(AmqpPayloadKind, ProtocolAttachment)> = vec![
533 (
534 AmqpPayloadKind::Contract(AmqpContractMeta {
535 default_vhost: "/events".into(),
536 ..Default::default()
537 }),
538 amqp.attach_contract(&AmqpContractMeta {
539 default_vhost: "/events".into(),
540 ..Default::default()
541 }),
542 ),
543 (
544 AmqpPayloadKind::Channel(AmqpChannelMeta {
545 channel_kind: "routingKey".into(),
546 exchange_name: "events".into(),
547 exchange_type: "topic".into(),
548 exchange_durable: true,
549 ..Default::default()
550 }),
551 amqp.attach_channel(&AmqpChannelMeta {
552 channel_kind: "routingKey".into(),
553 exchange_name: "events".into(),
554 exchange_type: "topic".into(),
555 exchange_durable: true,
556 ..Default::default()
557 }),
558 ),
559 (
560 AmqpPayloadKind::Operation(AmqpOperationMeta {
561 delivery_mode: 2,
562 priority: 5,
563 ..Default::default()
564 }),
565 amqp.attach_operation(&AmqpOperationMeta {
566 delivery_mode: 2,
567 priority: 5,
568 ..Default::default()
569 }),
570 ),
571 (
572 AmqpPayloadKind::Message(AmqpMessageMeta {
573 content_type: "application/json".into(),
574 ..Default::default()
575 }),
576 amqp.attach_message(&AmqpMessageMeta {
577 content_type: "application/json".into(),
578 ..Default::default()
579 }),
580 ),
581 ];
582
583 for (expected_kind, attachment) in cases {
584 match registry.decode_attachment(&attachment).unwrap() {
585 DecodedAttachment::Amqp(kind) => assert_eq!(kind, expected_kind),
586 other => panic!("expected amqp decode, got {other:?}"),
587 }
588 }
589 }
590
591 #[test]
592 fn mqtt_matrix_roundtrips() {
593 let registry = ProtocolRegistry::with_builtins();
594 let mqtt = registry.mqtt();
595
596 let cases: Vec<(MqttPayloadKind, ProtocolAttachment)> = vec![
597 (
598 MqttPayloadKind::Contract(MqttContractMeta {
599 broker_urls: vec!["mqtt://broker:1883".into()],
600 ..Default::default()
601 }),
602 mqtt.attach_contract(&MqttContractMeta {
603 broker_urls: vec!["mqtt://broker:1883".into()],
604 ..Default::default()
605 }),
606 ),
607 (
608 MqttPayloadKind::Channel(MqttChannelMeta {
609 topic: "streetlights/1/0/event".into(),
610 ..Default::default()
611 }),
612 mqtt.attach_channel(&MqttChannelMeta {
613 topic: "streetlights/1/0/event".into(),
614 ..Default::default()
615 }),
616 ),
617 (
618 MqttPayloadKind::Operation(MqttOperationMeta {
619 qos: 2,
620 retain: true,
621 message_expiry_interval: 60,
622 ..Default::default()
623 }),
624 mqtt.attach_operation(&MqttOperationMeta {
625 qos: 2,
626 retain: true,
627 message_expiry_interval: 60,
628 ..Default::default()
629 }),
630 ),
631 (
632 MqttPayloadKind::Message(MqttMessageMeta {
633 response_topic: "application/responses".into(),
634 ..Default::default()
635 }),
636 mqtt.attach_message(&MqttMessageMeta {
637 response_topic: "application/responses".into(),
638 ..Default::default()
639 }),
640 ),
641 ];
642
643 for (expected_kind, attachment) in cases {
644 match registry.decode_attachment(&attachment).unwrap() {
645 DecodedAttachment::Mqtt(kind) => assert_eq!(kind, expected_kind),
646 other => panic!("expected mqtt decode, got {other:?}"),
647 }
648 }
649 }
650
651 #[test]
652 fn opaque_custom_protocol_passthrough() {
653 let registry = ProtocolRegistry::with_builtins();
654 let attachment = ProtocolAttachment {
655 protocol_id: "acme/custom".into(),
656 payload: vec![1, 2, 3],
657 };
658 match registry.decode_attachment(&attachment).unwrap() {
659 DecodedAttachment::Opaque {
660 protocol_id,
661 payload,
662 } => {
663 assert_eq!(protocol_id, "acme/custom");
664 assert_eq!(payload, vec![1, 2, 3]);
665 }
666 other => panic!("expected opaque, got {other:?}"),
667 }
668 }
669}