Skip to main content

dynamo_runtime/transports/event_plane/
codec.rs

1// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Event plane codec for serializing/deserializing envelopes and payloads.
5
6use anyhow::Result;
7use bytes::Bytes;
8use serde::{Serialize, de::DeserializeOwned};
9
10use super::EventEnvelope;
11
12/// Codec for serializing and deserializing event envelopes and payloads.
13///
14/// Currently only supports MessagePack for all transports.
15#[derive(Debug, Clone, Copy)]
16pub enum Codec {
17    Msgpack(MsgpackCodec),
18}
19
20impl Default for Codec {
21    fn default() -> Self {
22        Codec::Msgpack(MsgpackCodec)
23    }
24}
25
26impl Codec {
27    /// Encode an EventEnvelope to wire bytes
28    pub fn encode_envelope(&self, envelope: &EventEnvelope) -> Result<Bytes> {
29        match self {
30            Codec::Msgpack(c) => c.encode_envelope(envelope),
31        }
32    }
33
34    /// Decode wire bytes to an EventEnvelope
35    pub fn decode_envelope(&self, bytes: &Bytes) -> Result<EventEnvelope> {
36        match self {
37            Codec::Msgpack(c) => c.decode_envelope(bytes),
38        }
39    }
40
41    /// Encode a typed payload to bytes (for embedding in envelope)
42    pub fn encode_payload<T: Serialize>(&self, payload: &T) -> Result<Bytes> {
43        match self {
44            Codec::Msgpack(c) => c.encode_payload(payload),
45        }
46    }
47
48    /// Decode payload bytes to a typed value
49    pub fn decode_payload<T: DeserializeOwned>(&self, bytes: &Bytes) -> Result<T> {
50        match self {
51            Codec::Msgpack(c) => c.decode_payload(bytes),
52        }
53    }
54
55    /// Codec name for debugging
56    pub fn name(&self) -> &'static str {
57        match self {
58            Codec::Msgpack(c) => c.name(),
59        }
60    }
61}
62
63#[derive(Debug, Clone, Copy, Default)]
64pub struct MsgpackCodec;
65
66impl MsgpackCodec {
67    pub fn encode_envelope(&self, envelope: &EventEnvelope) -> Result<Bytes> {
68        Ok(Bytes::from(rmp_serde::to_vec_named(envelope)?))
69    }
70
71    pub fn decode_envelope(&self, bytes: &Bytes) -> Result<EventEnvelope> {
72        Ok(rmp_serde::from_slice(bytes)?)
73    }
74
75    pub fn encode_payload<T: Serialize>(&self, payload: &T) -> Result<Bytes> {
76        Ok(Bytes::from(rmp_serde::to_vec_named(payload)?))
77    }
78
79    pub fn decode_payload<T: DeserializeOwned>(&self, bytes: &Bytes) -> Result<T> {
80        Ok(rmp_serde::from_slice(bytes)?)
81    }
82
83    pub fn name(&self) -> &'static str {
84        "msgpack"
85    }
86}
87
88#[cfg(test)]
89mod tests {
90    use super::*;
91
92    #[derive(Debug, Clone, PartialEq, Serialize, serde::Deserialize)]
93    struct TestEvent {
94        worker_id: u64,
95        message: String,
96    }
97
98    #[test]
99    fn test_msgpack_codec_envelope_roundtrip() {
100        let codec = MsgpackCodec;
101
102        let envelope = EventEnvelope {
103            publisher_id: 12345,
104            sequence: 42,
105            published_at: 1700000000000,
106            topic: "test-topic".to_string(),
107            payload: Bytes::from("test payload"),
108        };
109
110        let encoded = codec.encode_envelope(&envelope).unwrap();
111        let decoded = codec.decode_envelope(&encoded).unwrap();
112
113        assert_eq!(decoded.publisher_id, envelope.publisher_id);
114        assert_eq!(decoded.sequence, envelope.sequence);
115        assert_eq!(decoded.published_at, envelope.published_at);
116        assert_eq!(decoded.topic, envelope.topic);
117        assert_eq!(decoded.payload, envelope.payload);
118    }
119
120    #[test]
121    fn test_msgpack_codec_payload_roundtrip() {
122        let codec = MsgpackCodec;
123
124        let event = TestEvent {
125            worker_id: 123,
126            message: "hello world".to_string(),
127        };
128
129        let encoded = codec.encode_payload(&event).unwrap();
130        let decoded: TestEvent = codec.decode_payload(&encoded).unwrap();
131
132        assert_eq!(decoded, event);
133    }
134}