Skip to main content

switchback_protocols/
registry.rs

1//! Protocol registry for encode/decode of attachments.
2
3use switchback_codec_pb::canardleteer::switchback::protocol::amqp::v1alpha1::AmqpPayload;
4use switchback_codec_pb::canardleteer::switchback::protocol::amqp::v1alpha1::__buffa::oneof::amqp_payload::Kind as AmqpKind;
5use switchback_codec_pb::canardleteer::switchback::protocol::grpc::v1alpha1::GrpcPayload;
6use switchback_codec_pb::canardleteer::switchback::protocol::grpc::v1alpha1::__buffa::oneof::grpc_payload::Kind as GrpcKind;
7use switchback_codec_pb::canardleteer::switchback::protocol::http::v1alpha1::HttpPayload;
8use switchback_codec_pb::canardleteer::switchback::protocol::http::v1alpha1::__buffa::oneof::http_payload::Kind as HttpKind;
9use switchback_codec_pb::canardleteer::switchback::protocol::kafka::v1alpha1::KafkaPayload;
10use switchback_codec_pb::canardleteer::switchback::protocol::kafka::v1alpha1::__buffa::oneof::kafka_payload::Kind as KafkaKind;
11use switchback_codec_pb::canardleteer::switchback::protocol::mqtt::v1alpha1::MqttPayload;
12use switchback_codec_pb::canardleteer::switchback::protocol::mqtt::v1alpha1::__buffa::oneof::mqtt_payload::Kind as MqttKind;
13use switchback_traits::{ProtocolAttachment, Result, SwitchbackError};
14
15use crate::amqp::AmqpProtocol;
16use crate::grpc::GrpcProtocol;
17use crate::http::HttpProtocol;
18use crate::kafka::KafkaProtocol;
19use crate::mqtt::MqttProtocol;
20use crate::wire::decode_message;
21
22/// Decoded HTTP payload oneof arm.
23#[derive(Clone, Debug, PartialEq)]
24pub enum HttpPayloadKind {
25    /// Contract-level metadata.
26    Contract(
27        switchback_codec_pb::canardleteer::switchback::protocol::http::v1alpha1::HttpContractMeta,
28    ),
29    /// Operation invocation metadata.
30    Operation(
31        switchback_codec_pb::canardleteer::switchback::protocol::http::v1alpha1::HttpOperationMeta,
32    ),
33    /// Success response metadata.
34    Response(
35        switchback_codec_pb::canardleteer::switchback::protocol::http::v1alpha1::HttpResponseMeta,
36    ),
37    /// Error response metadata.
38    Error(switchback_codec_pb::canardleteer::switchback::protocol::http::v1alpha1::HttpErrorMeta),
39    /// Parameter metadata.
40    Parameter(
41        switchback_codec_pb::canardleteer::switchback::protocol::http::v1alpha1::HttpParameterMeta,
42    ),
43}
44
45/// Decoded gRPC payload oneof arm.
46#[derive(Clone, Debug, PartialEq)]
47pub enum GrpcPayloadKind {
48    /// Contract-level metadata.
49    Contract(
50        switchback_codec_pb::canardleteer::switchback::protocol::grpc::v1alpha1::GrpcContractMeta,
51    ),
52    /// Operation invocation metadata.
53    Operation(
54        switchback_codec_pb::canardleteer::switchback::protocol::grpc::v1alpha1::GrpcOperationMeta,
55    ),
56    /// Success status metadata.
57    Status(switchback_codec_pb::canardleteer::switchback::protocol::grpc::v1alpha1::GrpcStatusMeta),
58    /// Error metadata.
59    Error(switchback_codec_pb::canardleteer::switchback::protocol::grpc::v1alpha1::GrpcErrorMeta),
60    /// Metadata key.
61    Metadata(
62        switchback_codec_pb::canardleteer::switchback::protocol::grpc::v1alpha1::GrpcMetadataMeta,
63    ),
64}
65
66/// Decoded Kafka payload oneof arm.
67#[derive(Clone, Debug, PartialEq)]
68pub enum KafkaPayloadKind {
69    /// Contract-level metadata.
70    Contract(
71        switchback_codec_pb::canardleteer::switchback::protocol::kafka::v1alpha1::KafkaContractMeta,
72    ),
73    /// Channel metadata.
74    Channel(
75        switchback_codec_pb::canardleteer::switchback::protocol::kafka::v1alpha1::KafkaChannelMeta,
76    ),
77    /// Operation metadata.
78    Operation(
79        switchback_codec_pb::canardleteer::switchback::protocol::kafka::v1alpha1::KafkaOperationMeta,
80    ),
81    /// Message metadata.
82    Message(
83        switchback_codec_pb::canardleteer::switchback::protocol::kafka::v1alpha1::KafkaMessageMeta,
84    ),
85}
86
87/// Decoded AMQP payload oneof arm.
88#[derive(Clone, Debug, PartialEq)]
89pub enum AmqpPayloadKind {
90    /// Contract-level metadata.
91    Contract(
92        switchback_codec_pb::canardleteer::switchback::protocol::amqp::v1alpha1::AmqpContractMeta,
93    ),
94    /// Channel metadata.
95    Channel(
96        switchback_codec_pb::canardleteer::switchback::protocol::amqp::v1alpha1::AmqpChannelMeta,
97    ),
98    /// Operation metadata.
99    Operation(
100        switchback_codec_pb::canardleteer::switchback::protocol::amqp::v1alpha1::AmqpOperationMeta,
101    ),
102    /// Message metadata.
103    Message(
104        switchback_codec_pb::canardleteer::switchback::protocol::amqp::v1alpha1::AmqpMessageMeta,
105    ),
106}
107
108/// Decoded MQTT payload oneof arm.
109#[derive(Clone, Debug, PartialEq)]
110pub enum MqttPayloadKind {
111    /// Contract-level metadata.
112    Contract(
113        switchback_codec_pb::canardleteer::switchback::protocol::mqtt::v1alpha1::MqttContractMeta,
114    ),
115    /// Channel metadata.
116    Channel(
117        switchback_codec_pb::canardleteer::switchback::protocol::mqtt::v1alpha1::MqttChannelMeta,
118    ),
119    /// Operation metadata.
120    Operation(
121        switchback_codec_pb::canardleteer::switchback::protocol::mqtt::v1alpha1::MqttOperationMeta,
122    ),
123    /// Message metadata.
124    Message(
125        switchback_codec_pb::canardleteer::switchback::protocol::mqtt::v1alpha1::MqttMessageMeta,
126    ),
127}
128
129/// Result of decoding a [`ProtocolAttachment`].
130#[derive(Clone, Debug, PartialEq)]
131pub enum DecodedAttachment {
132    /// Known HTTP payload arm.
133    Http(HttpPayloadKind),
134    /// Known gRPC payload arm.
135    Grpc(GrpcPayloadKind),
136    /// Known Kafka payload arm.
137    Kafka(KafkaPayloadKind),
138    /// Known AMQP payload arm.
139    Amqp(AmqpPayloadKind),
140    /// Known MQTT payload arm.
141    Mqtt(MqttPayloadKind),
142    /// Unknown or custom protocol; bytes round-trip opaquely.
143    Opaque {
144        /// Protocol slug from the attachment envelope.
145        protocol_id: String,
146        /// Opaque payload bytes.
147        payload: Vec<u8>,
148    },
149}
150
151/// Registry of built-in protocol decoders.
152#[derive(Clone, Debug, Default)]
153pub struct ProtocolRegistry {
154    http: HttpProtocol,
155    grpc: GrpcProtocol,
156    kafka: KafkaProtocol,
157    amqp: AmqpProtocol,
158    mqtt: MqttProtocol,
159}
160
161impl ProtocolRegistry {
162    /// Built-in registry with `http`, `grpc`, `kafka`, `amqp`, and `mqtt` registered.
163    pub fn with_builtins() -> Self {
164        Self::default()
165    }
166
167    /// HTTP protocol implementation.
168    pub fn http(&self) -> &HttpProtocol {
169        &self.http
170    }
171
172    /// gRPC protocol implementation.
173    pub fn grpc(&self) -> &GrpcProtocol {
174        &self.grpc
175    }
176
177    /// Kafka protocol implementation.
178    pub fn kafka(&self) -> &KafkaProtocol {
179        &self.kafka
180    }
181
182    /// AMQP protocol implementation.
183    pub fn amqp(&self) -> &AmqpProtocol {
184        &self.amqp
185    }
186
187    /// MQTT protocol implementation.
188    pub fn mqtt(&self) -> &MqttProtocol {
189        &self.mqtt
190    }
191
192    /// Decode a protocol attachment envelope.
193    ///
194    /// Built-in ids deserialize to the matching [`DecodedAttachment`] variant;
195    /// other ids return [`DecodedAttachment::Opaque`] with bytes unchanged.
196    pub fn decode_attachment(&self, attachment: &ProtocolAttachment) -> Result<DecodedAttachment> {
197        match attachment.protocol_id.as_str() {
198            "http" => decode_http(&attachment.payload).map(DecodedAttachment::Http),
199            "grpc" => decode_grpc(&attachment.payload).map(DecodedAttachment::Grpc),
200            "kafka" => decode_kafka(&attachment.payload).map(DecodedAttachment::Kafka),
201            "amqp" => decode_amqp(&attachment.payload).map(DecodedAttachment::Amqp),
202            "mqtt" => decode_mqtt(&attachment.payload).map(DecodedAttachment::Mqtt),
203            other => Ok(DecodedAttachment::Opaque {
204                protocol_id: other.to_string(),
205                payload: attachment.payload.clone(),
206            }),
207        }
208    }
209
210    /// Find the first HTTP operation meta on an operation body's attachments.
211    pub fn http_operation_from_attachments(
212        &self,
213        protocols: &[ProtocolAttachment],
214    ) -> Option<
215        switchback_codec_pb::canardleteer::switchback::protocol::http::v1alpha1::HttpOperationMeta,
216    > {
217        for attachment in protocols {
218            if let Ok(DecodedAttachment::Http(HttpPayloadKind::Operation(meta))) =
219                self.decode_attachment(attachment)
220            {
221                return Some(meta);
222            }
223        }
224        None
225    }
226
227    /// Find the first gRPC operation meta on an operation body's attachments.
228    pub fn grpc_operation_from_attachments(
229        &self,
230        protocols: &[ProtocolAttachment],
231    ) -> Option<
232        switchback_codec_pb::canardleteer::switchback::protocol::grpc::v1alpha1::GrpcOperationMeta,
233    > {
234        for attachment in protocols {
235            if let Ok(DecodedAttachment::Grpc(GrpcPayloadKind::Operation(meta))) =
236                self.decode_attachment(attachment)
237            {
238                return Some(meta);
239            }
240        }
241        None
242    }
243}
244
245fn decode_http(bytes: &[u8]) -> Result<HttpPayloadKind> {
246    let payload: HttpPayload = decode_message(bytes)?;
247    match payload.kind {
248        Some(HttpKind::Contract(v)) => Ok(HttpPayloadKind::Contract(*v)),
249        Some(HttpKind::Operation(v)) => Ok(HttpPayloadKind::Operation(*v)),
250        Some(HttpKind::Response(v)) => Ok(HttpPayloadKind::Response(*v)),
251        Some(HttpKind::Error(v)) => Ok(HttpPayloadKind::Error(*v)),
252        Some(HttpKind::Parameter(v)) => Ok(HttpPayloadKind::Parameter(*v)),
253        None => Err(SwitchbackError::codec("empty HttpPayload")),
254    }
255}
256
257fn decode_grpc(bytes: &[u8]) -> Result<GrpcPayloadKind> {
258    let payload: GrpcPayload = decode_message(bytes)?;
259    match payload.kind {
260        Some(GrpcKind::Contract(v)) => Ok(GrpcPayloadKind::Contract(*v)),
261        Some(GrpcKind::Operation(v)) => Ok(GrpcPayloadKind::Operation(*v)),
262        Some(GrpcKind::Status(v)) => Ok(GrpcPayloadKind::Status(*v)),
263        Some(GrpcKind::Error(v)) => Ok(GrpcPayloadKind::Error(*v)),
264        Some(GrpcKind::Metadata(v)) => Ok(GrpcPayloadKind::Metadata(*v)),
265        None => Err(SwitchbackError::codec("empty GrpcPayload")),
266    }
267}
268
269fn decode_kafka(bytes: &[u8]) -> Result<KafkaPayloadKind> {
270    let payload: KafkaPayload = decode_message(bytes)?;
271    match payload.kind {
272        Some(KafkaKind::Contract(v)) => Ok(KafkaPayloadKind::Contract(*v)),
273        Some(KafkaKind::Channel(v)) => Ok(KafkaPayloadKind::Channel(*v)),
274        Some(KafkaKind::Operation(v)) => Ok(KafkaPayloadKind::Operation(*v)),
275        Some(KafkaKind::Message(v)) => Ok(KafkaPayloadKind::Message(*v)),
276        None => Err(SwitchbackError::codec("empty KafkaPayload")),
277    }
278}
279
280fn decode_amqp(bytes: &[u8]) -> Result<AmqpPayloadKind> {
281    let payload: AmqpPayload = decode_message(bytes)?;
282    match payload.kind {
283        Some(AmqpKind::Contract(v)) => Ok(AmqpPayloadKind::Contract(*v)),
284        Some(AmqpKind::Channel(v)) => Ok(AmqpPayloadKind::Channel(*v)),
285        Some(AmqpKind::Operation(v)) => Ok(AmqpPayloadKind::Operation(*v)),
286        Some(AmqpKind::Message(v)) => Ok(AmqpPayloadKind::Message(*v)),
287        None => Err(SwitchbackError::codec("empty AmqpPayload")),
288    }
289}
290
291fn decode_mqtt(bytes: &[u8]) -> Result<MqttPayloadKind> {
292    let payload: MqttPayload = decode_message(bytes)?;
293    match payload.kind {
294        Some(MqttKind::Contract(v)) => Ok(MqttPayloadKind::Contract(*v)),
295        Some(MqttKind::Channel(v)) => Ok(MqttPayloadKind::Channel(*v)),
296        Some(MqttKind::Operation(v)) => Ok(MqttPayloadKind::Operation(*v)),
297        Some(MqttKind::Message(v)) => Ok(MqttPayloadKind::Message(*v)),
298        None => Err(SwitchbackError::codec("empty MqttPayload")),
299    }
300}
301
302#[cfg(test)]
303mod coverage_matrix {
304    use super::*;
305    use switchback_codec_pb::canardleteer::switchback::protocol::amqp::v1alpha1::{
306        AmqpChannelMeta, AmqpContractMeta, AmqpMessageMeta, AmqpOperationMeta,
307    };
308    use switchback_codec_pb::canardleteer::switchback::protocol::grpc::v1alpha1::{
309        GrpcContractMeta, GrpcErrorMeta, GrpcMetadataMeta, GrpcOperationMeta, GrpcStatusMeta,
310    };
311    use switchback_codec_pb::canardleteer::switchback::protocol::http::v1alpha1::{
312        HttpContractMeta, HttpErrorMeta, HttpOperationMeta, HttpParameterMeta, HttpResponseMeta,
313    };
314    use switchback_codec_pb::canardleteer::switchback::protocol::kafka::v1alpha1::{
315        KafkaChannelMeta, KafkaContractMeta, KafkaMessageMeta, KafkaOperationMeta,
316    };
317    use switchback_codec_pb::canardleteer::switchback::protocol::mqtt::v1alpha1::{
318        MqttChannelMeta, MqttContractMeta, MqttMessageMeta, MqttOperationMeta,
319    };
320
321    #[test]
322    fn http_matrix_roundtrips() {
323        let registry = ProtocolRegistry::with_builtins();
324        let http = registry.http();
325
326        let cases: Vec<(HttpPayloadKind, ProtocolAttachment)> = vec![
327            (
328                HttpPayloadKind::Contract(HttpContractMeta {
329                    default_server_url: "https://api.example.com".into(),
330                    ..Default::default()
331                }),
332                http.attach_contract(&HttpContractMeta {
333                    default_server_url: "https://api.example.com".into(),
334                    ..Default::default()
335                }),
336            ),
337            (
338                HttpPayloadKind::Operation(HttpOperationMeta {
339                    method: "GET".into(),
340                    path_template: "/pets".into(),
341                    ..Default::default()
342                }),
343                http.attach_operation(&HttpOperationMeta {
344                    method: "GET".into(),
345                    path_template: "/pets".into(),
346                    ..Default::default()
347                }),
348            ),
349            (
350                HttpPayloadKind::Response(HttpResponseMeta {
351                    status_code: 200,
352                    ..Default::default()
353                }),
354                http.attach_response(&HttpResponseMeta {
355                    status_code: 200,
356                    ..Default::default()
357                }),
358            ),
359            (
360                HttpPayloadKind::Error(HttpErrorMeta {
361                    status_code: 404,
362                    ..Default::default()
363                }),
364                http.attach_error(&HttpErrorMeta {
365                    status_code: 404,
366                    ..Default::default()
367                }),
368            ),
369            (
370                HttpPayloadKind::Parameter(HttpParameterMeta {
371                    name: "id".into(),
372                    location: "path".into(),
373                    required: true,
374                    ..Default::default()
375                }),
376                http.attach_parameter(&HttpParameterMeta {
377                    name: "id".into(),
378                    location: "path".into(),
379                    required: true,
380                    ..Default::default()
381                }),
382            ),
383        ];
384
385        for (expected_kind, attachment) in cases {
386            match registry.decode_attachment(&attachment).unwrap() {
387                DecodedAttachment::Http(kind) => assert_eq!(kind, expected_kind),
388                other => panic!("expected http decode, got {other:?}"),
389            }
390        }
391    }
392
393    #[test]
394    fn grpc_matrix_roundtrips() {
395        let registry = ProtocolRegistry::with_builtins();
396        let grpc = registry.grpc();
397
398        let cases: Vec<(GrpcPayloadKind, ProtocolAttachment)> = vec![
399            (
400                GrpcPayloadKind::Contract(GrpcContractMeta {
401                    package_name: "acme.v1".into(),
402                    ..Default::default()
403                }),
404                grpc.attach_contract(&GrpcContractMeta {
405                    package_name: "acme.v1".into(),
406                    ..Default::default()
407                }),
408            ),
409            (
410                GrpcPayloadKind::Operation(GrpcOperationMeta {
411                    rpc_name: "GetPet".into(),
412                    ..Default::default()
413                }),
414                grpc.attach_operation(&GrpcOperationMeta {
415                    rpc_name: "GetPet".into(),
416                    ..Default::default()
417                }),
418            ),
419            (
420                GrpcPayloadKind::Status(GrpcStatusMeta {
421                    code: 0,
422                    message: "OK".into(),
423                    ..Default::default()
424                }),
425                grpc.attach_status(&GrpcStatusMeta {
426                    code: 0,
427                    message: "OK".into(),
428                    ..Default::default()
429                }),
430            ),
431            (
432                GrpcPayloadKind::Error(GrpcErrorMeta {
433                    code: 5,
434                    message: "not found".into(),
435                    ..Default::default()
436                }),
437                grpc.attach_error(&GrpcErrorMeta {
438                    code: 5,
439                    message: "not found".into(),
440                    ..Default::default()
441                }),
442            ),
443            (
444                GrpcPayloadKind::Metadata(GrpcMetadataMeta {
445                    key: "x-request-id".into(),
446                    required: false,
447                    ..Default::default()
448                }),
449                grpc.attach_metadata(&GrpcMetadataMeta {
450                    key: "x-request-id".into(),
451                    required: false,
452                    ..Default::default()
453                }),
454            ),
455        ];
456
457        for (expected_kind, attachment) in cases {
458            match registry.decode_attachment(&attachment).unwrap() {
459                DecodedAttachment::Grpc(kind) => assert_eq!(kind, expected_kind),
460                other => panic!("expected grpc decode, got {other:?}"),
461            }
462        }
463    }
464
465    #[test]
466    fn kafka_matrix_roundtrips() {
467        let registry = ProtocolRegistry::with_builtins();
468        let kafka = registry.kafka();
469
470        let cases: Vec<(KafkaPayloadKind, ProtocolAttachment)> = vec![
471            (
472                KafkaPayloadKind::Contract(KafkaContractMeta {
473                    bootstrap_servers: vec!["kafka:9092".into()],
474                    ..Default::default()
475                }),
476                kafka.attach_contract(&KafkaContractMeta {
477                    bootstrap_servers: vec!["kafka:9092".into()],
478                    ..Default::default()
479                }),
480            ),
481            (
482                KafkaPayloadKind::Channel(KafkaChannelMeta {
483                    topic: "orders".into(),
484                    partitions: 12,
485                    replicas: 3,
486                    ..Default::default()
487                }),
488                kafka.attach_channel(&KafkaChannelMeta {
489                    topic: "orders".into(),
490                    partitions: 12,
491                    replicas: 3,
492                    ..Default::default()
493                }),
494            ),
495            (
496                KafkaPayloadKind::Operation(KafkaOperationMeta {
497                    group_id: "my-group".into(),
498                    client_id: "my-client".into(),
499                    ..Default::default()
500                }),
501                kafka.attach_operation(&KafkaOperationMeta {
502                    group_id: "my-group".into(),
503                    client_id: "my-client".into(),
504                    ..Default::default()
505                }),
506            ),
507            (
508                KafkaPayloadKind::Message(KafkaMessageMeta {
509                    schema_id_location: "payload".into(),
510                    ..Default::default()
511                }),
512                kafka.attach_message(&KafkaMessageMeta {
513                    schema_id_location: "payload".into(),
514                    ..Default::default()
515                }),
516            ),
517        ];
518
519        for (expected_kind, attachment) in cases {
520            match registry.decode_attachment(&attachment).unwrap() {
521                DecodedAttachment::Kafka(kind) => assert_eq!(kind, expected_kind),
522                other => panic!("expected kafka decode, got {other:?}"),
523            }
524        }
525    }
526
527    #[test]
528    fn amqp_matrix_roundtrips() {
529        let registry = ProtocolRegistry::with_builtins();
530        let amqp = registry.amqp();
531
532        let cases: Vec<(AmqpPayloadKind, ProtocolAttachment)> = vec![
533            (
534                AmqpPayloadKind::Contract(AmqpContractMeta {
535                    default_vhost: "/events".into(),
536                    ..Default::default()
537                }),
538                amqp.attach_contract(&AmqpContractMeta {
539                    default_vhost: "/events".into(),
540                    ..Default::default()
541                }),
542            ),
543            (
544                AmqpPayloadKind::Channel(AmqpChannelMeta {
545                    channel_kind: "routingKey".into(),
546                    exchange_name: "events".into(),
547                    exchange_type: "topic".into(),
548                    exchange_durable: true,
549                    ..Default::default()
550                }),
551                amqp.attach_channel(&AmqpChannelMeta {
552                    channel_kind: "routingKey".into(),
553                    exchange_name: "events".into(),
554                    exchange_type: "topic".into(),
555                    exchange_durable: true,
556                    ..Default::default()
557                }),
558            ),
559            (
560                AmqpPayloadKind::Operation(AmqpOperationMeta {
561                    delivery_mode: 2,
562                    priority: 5,
563                    ..Default::default()
564                }),
565                amqp.attach_operation(&AmqpOperationMeta {
566                    delivery_mode: 2,
567                    priority: 5,
568                    ..Default::default()
569                }),
570            ),
571            (
572                AmqpPayloadKind::Message(AmqpMessageMeta {
573                    content_type: "application/json".into(),
574                    ..Default::default()
575                }),
576                amqp.attach_message(&AmqpMessageMeta {
577                    content_type: "application/json".into(),
578                    ..Default::default()
579                }),
580            ),
581        ];
582
583        for (expected_kind, attachment) in cases {
584            match registry.decode_attachment(&attachment).unwrap() {
585                DecodedAttachment::Amqp(kind) => assert_eq!(kind, expected_kind),
586                other => panic!("expected amqp decode, got {other:?}"),
587            }
588        }
589    }
590
591    #[test]
592    fn mqtt_matrix_roundtrips() {
593        let registry = ProtocolRegistry::with_builtins();
594        let mqtt = registry.mqtt();
595
596        let cases: Vec<(MqttPayloadKind, ProtocolAttachment)> = vec![
597            (
598                MqttPayloadKind::Contract(MqttContractMeta {
599                    broker_urls: vec!["mqtt://broker:1883".into()],
600                    ..Default::default()
601                }),
602                mqtt.attach_contract(&MqttContractMeta {
603                    broker_urls: vec!["mqtt://broker:1883".into()],
604                    ..Default::default()
605                }),
606            ),
607            (
608                MqttPayloadKind::Channel(MqttChannelMeta {
609                    topic: "streetlights/1/0/event".into(),
610                    ..Default::default()
611                }),
612                mqtt.attach_channel(&MqttChannelMeta {
613                    topic: "streetlights/1/0/event".into(),
614                    ..Default::default()
615                }),
616            ),
617            (
618                MqttPayloadKind::Operation(MqttOperationMeta {
619                    qos: 2,
620                    retain: true,
621                    message_expiry_interval: 60,
622                    ..Default::default()
623                }),
624                mqtt.attach_operation(&MqttOperationMeta {
625                    qos: 2,
626                    retain: true,
627                    message_expiry_interval: 60,
628                    ..Default::default()
629                }),
630            ),
631            (
632                MqttPayloadKind::Message(MqttMessageMeta {
633                    response_topic: "application/responses".into(),
634                    ..Default::default()
635                }),
636                mqtt.attach_message(&MqttMessageMeta {
637                    response_topic: "application/responses".into(),
638                    ..Default::default()
639                }),
640            ),
641        ];
642
643        for (expected_kind, attachment) in cases {
644            match registry.decode_attachment(&attachment).unwrap() {
645                DecodedAttachment::Mqtt(kind) => assert_eq!(kind, expected_kind),
646                other => panic!("expected mqtt decode, got {other:?}"),
647            }
648        }
649    }
650
651    #[test]
652    fn opaque_custom_protocol_passthrough() {
653        let registry = ProtocolRegistry::with_builtins();
654        let attachment = ProtocolAttachment {
655            protocol_id: "acme/custom".into(),
656            payload: vec![1, 2, 3],
657        };
658        match registry.decode_attachment(&attachment).unwrap() {
659            DecodedAttachment::Opaque {
660                protocol_id,
661                payload,
662            } => {
663                assert_eq!(protocol_id, "acme/custom");
664                assert_eq!(payload, vec![1, 2, 3]);
665            }
666            other => panic!("expected opaque, got {other:?}"),
667        }
668    }
669}