Skip to main content

synapse_rpc/codec/
protobuf.rs

1//! Protobuf codec implementation
2
3use super::{Codec, ContentType};
4use anyhow::{Context, Result};
5use bytes::{Bytes, BytesMut};
6use prost::Message;
7use synapse_proto::SynapseMessage;
8
9/// Protobuf codec for binary serialization
10///
11/// Uses prost for encoding/decoding protobuf messages.
12/// This is the production format for service-to-service communication.
13pub struct ProtobufCodec;
14
15impl Codec for ProtobufCodec {
16    fn encode(message: &SynapseMessage) -> Result<Bytes> {
17        let mut buf = BytesMut::with_capacity(message.encoded_len());
18        message
19            .encode(&mut buf)
20            .context("Failed to encode as protobuf")?;
21        Ok(buf.freeze())
22    }
23
24    fn decode(data: &[u8]) -> Result<SynapseMessage> {
25        SynapseMessage::decode(data).context("Failed to decode protobuf")
26    }
27
28    fn content_type() -> ContentType {
29        ContentType::Protobuf
30    }
31}
32
33#[cfg(test)]
34mod tests {
35    use super::*;
36    use synapse_proto::{MessageKind, RpcRequest, RpcResponse, RpcStatus, synapse_message};
37
38    #[test]
39    fn test_request_roundtrip() {
40        let msg = SynapseMessage {
41            protocol_version: 1_000_000,
42            kind: MessageKind::RpcRequest as i32,
43            request_id: Bytes::from(vec![0u8; 16]),
44            message: Some(synapse_message::Message::RpcRequest(RpcRequest {
45                interface_id: 12345,
46                method_id: 67890,
47                headers: vec![],
48                payload: Bytes::from("test payload"),
49                sent_at_unix_ms: 1234567890123,
50            })),
51        };
52
53        let encoded = ProtobufCodec::encode(&msg).unwrap();
54        let decoded = ProtobufCodec::decode(&encoded).unwrap();
55
56        assert_eq!(msg.protocol_version, decoded.protocol_version);
57        assert_eq!(msg.kind, decoded.kind);
58
59        if let Some(synapse_message::Message::RpcRequest(req)) = decoded.message {
60            assert_eq!(12345, req.interface_id);
61            assert_eq!(67890, req.method_id);
62        } else {
63            panic!("Expected RpcRequest");
64        }
65    }
66
67    #[test]
68    fn test_response_roundtrip() {
69        let msg = SynapseMessage {
70            protocol_version: 1_000_000,
71            kind: MessageKind::RpcResponse as i32,
72            request_id: Bytes::from(vec![0u8; 16]),
73            message: Some(synapse_message::Message::RpcResponse(RpcResponse {
74                status: RpcStatus::Ok as i32,
75                payload: Bytes::from("response payload"),
76                error: None,
77                headers: vec![],
78                responded_at_unix_ms: 1234567890123,
79            })),
80        };
81
82        let encoded = ProtobufCodec::encode(&msg).unwrap();
83        let decoded = ProtobufCodec::decode(&encoded).unwrap();
84
85        if let Some(synapse_message::Message::RpcResponse(resp)) = decoded.message {
86            assert_eq!(RpcStatus::Ok as i32, resp.status);
87        } else {
88            panic!("Expected RpcResponse");
89        }
90    }
91
92    #[test]
93    fn test_invalid_data() {
94        let invalid_data = b"not valid protobuf";
95        assert!(ProtobufCodec::decode(invalid_data).is_err());
96    }
97}