dynamo_runtime/transports/event_plane/
codec.rs1use anyhow::Result;
7use bytes::Bytes;
8use serde::{Serialize, de::DeserializeOwned};
9
10use super::EventEnvelope;
11
12#[derive(Debug, Clone, Copy)]
16pub enum Codec {
17 Msgpack(MsgpackCodec),
18}
19
20impl Default for Codec {
21 fn default() -> Self {
22 Codec::Msgpack(MsgpackCodec)
23 }
24}
25
26impl Codec {
27 pub fn encode_envelope(&self, envelope: &EventEnvelope) -> Result<Bytes> {
29 match self {
30 Codec::Msgpack(c) => c.encode_envelope(envelope),
31 }
32 }
33
34 pub fn decode_envelope(&self, bytes: &Bytes) -> Result<EventEnvelope> {
36 match self {
37 Codec::Msgpack(c) => c.decode_envelope(bytes),
38 }
39 }
40
41 pub fn encode_payload<T: Serialize>(&self, payload: &T) -> Result<Bytes> {
43 match self {
44 Codec::Msgpack(c) => c.encode_payload(payload),
45 }
46 }
47
48 pub fn decode_payload<T: DeserializeOwned>(&self, bytes: &Bytes) -> Result<T> {
50 match self {
51 Codec::Msgpack(c) => c.decode_payload(bytes),
52 }
53 }
54
55 pub fn name(&self) -> &'static str {
57 match self {
58 Codec::Msgpack(c) => c.name(),
59 }
60 }
61}
62
63#[derive(Debug, Clone, Copy, Default)]
64pub struct MsgpackCodec;
65
66impl MsgpackCodec {
67 pub fn encode_envelope(&self, envelope: &EventEnvelope) -> Result<Bytes> {
68 Ok(Bytes::from(rmp_serde::to_vec_named(envelope)?))
69 }
70
71 pub fn decode_envelope(&self, bytes: &Bytes) -> Result<EventEnvelope> {
72 Ok(rmp_serde::from_slice(bytes)?)
73 }
74
75 pub fn encode_payload<T: Serialize>(&self, payload: &T) -> Result<Bytes> {
76 Ok(Bytes::from(rmp_serde::to_vec_named(payload)?))
77 }
78
79 pub fn decode_payload<T: DeserializeOwned>(&self, bytes: &Bytes) -> Result<T> {
80 Ok(rmp_serde::from_slice(bytes)?)
81 }
82
83 pub fn name(&self) -> &'static str {
84 "msgpack"
85 }
86}
87
88#[cfg(test)]
89mod tests {
90 use super::*;
91
92 #[derive(Debug, Clone, PartialEq, Serialize, serde::Deserialize)]
93 struct TestEvent {
94 worker_id: u64,
95 message: String,
96 }
97
98 #[test]
99 fn test_msgpack_codec_envelope_roundtrip() {
100 let codec = MsgpackCodec;
101
102 let envelope = EventEnvelope {
103 publisher_id: 12345,
104 sequence: 42,
105 published_at: 1700000000000,
106 topic: "test-topic".to_string(),
107 payload: Bytes::from("test payload"),
108 };
109
110 let encoded = codec.encode_envelope(&envelope).unwrap();
111 let decoded = codec.decode_envelope(&encoded).unwrap();
112
113 assert_eq!(decoded.publisher_id, envelope.publisher_id);
114 assert_eq!(decoded.sequence, envelope.sequence);
115 assert_eq!(decoded.published_at, envelope.published_at);
116 assert_eq!(decoded.topic, envelope.topic);
117 assert_eq!(decoded.payload, envelope.payload);
118 }
119
120 #[test]
121 fn test_msgpack_codec_payload_roundtrip() {
122 let codec = MsgpackCodec;
123
124 let event = TestEvent {
125 worker_id: 123,
126 message: "hello world".to_string(),
127 };
128
129 let encoded = codec.encode_payload(&event).unwrap();
130 let decoded: TestEvent = codec.decode_payload(&encoded).unwrap();
131
132 assert_eq!(decoded, event);
133 }
134}