Skip to main content

yantrikdb_protocol/
codec.rs

1use bytes::BytesMut;
2use tokio_util::codec::{Decoder, Encoder};
3
4use crate::error::ProtocolError;
5use crate::frame::Frame;
6
7/// Tokio codec for YantrikDB wire protocol frames.
8///
9/// Handles length-delimited framing with the Frame encode/decode logic.
10#[derive(Debug, Default)]
11pub struct YantrikCodec;
12
13impl YantrikCodec {
14    pub fn new() -> Self {
15        Self
16    }
17}
18
19impl Decoder for YantrikCodec {
20    type Item = Frame;
21    type Error = ProtocolError;
22
23    fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
24        Frame::decode(src)
25    }
26}
27
28impl Encoder<Frame> for YantrikCodec {
29    type Error = ProtocolError;
30
31    fn encode(&mut self, item: Frame, dst: &mut BytesMut) -> Result<(), Self::Error> {
32        item.encode(dst);
33        Ok(())
34    }
35}
36
37#[cfg(test)]
38mod tests {
39    use super::*;
40    use crate::opcodes::OpCode;
41    use bytes::Bytes;
42
43    #[test]
44    fn codec_roundtrip() {
45        let mut codec = YantrikCodec::new();
46        let frame = Frame::new(OpCode::Recall, 99, Bytes::from("test query"));
47
48        let mut buf = BytesMut::new();
49        codec.encode(frame, &mut buf).unwrap();
50
51        let decoded = codec.decode(&mut buf).unwrap().unwrap();
52        assert_eq!(decoded.opcode, OpCode::Recall);
53        assert_eq!(decoded.stream_id, 99);
54        assert_eq!(decoded.payload, Bytes::from("test query"));
55    }
56
57    #[test]
58    fn codec_partial_then_complete() {
59        let mut codec = YantrikCodec::new();
60        let frame = Frame::new(OpCode::Remember, 1, Bytes::from("data"));
61
62        let mut full = BytesMut::new();
63        codec.encode(frame, &mut full).unwrap();
64
65        // Split into two parts
66        let mut part1 = full.split_to(5);
67
68        // First decode: not enough data
69        assert!(codec.decode(&mut part1).unwrap().is_none());
70
71        // Append the rest
72        part1.unsplit(full);
73
74        // Now it should decode
75        let decoded = codec.decode(&mut part1).unwrap().unwrap();
76        assert_eq!(decoded.opcode, OpCode::Remember);
77    }
78
79    #[test]
80    fn codec_multiple_frames() {
81        let mut codec = YantrikCodec::new();
82        let mut buf = BytesMut::new();
83
84        let f1 = Frame::empty(OpCode::Ping, 0);
85        let f2 = Frame::new(OpCode::Remember, 1, Bytes::from("hello"));
86        let f3 = Frame::empty(OpCode::Pong, 0);
87
88        codec.encode(f1, &mut buf).unwrap();
89        codec.encode(f2, &mut buf).unwrap();
90        codec.encode(f3, &mut buf).unwrap();
91
92        let d1 = codec.decode(&mut buf).unwrap().unwrap();
93        assert_eq!(d1.opcode, OpCode::Ping);
94
95        let d2 = codec.decode(&mut buf).unwrap().unwrap();
96        assert_eq!(d2.opcode, OpCode::Remember);
97        assert_eq!(d2.payload, Bytes::from("hello"));
98
99        let d3 = codec.decode(&mut buf).unwrap().unwrap();
100        assert_eq!(d3.opcode, OpCode::Pong);
101
102        // Nothing left
103        assert!(codec.decode(&mut buf).unwrap().is_none());
104    }
105}