binate/frame/codec/
lease.rs

1use super::*;
2use bytes::{Buf, BufMut, Bytes, BytesMut};
3use std::time::Duration;
4
5/// The lease frame.
6///
7/// Lease frames MAY be sent by the client-side or server-side Responders and inform the Requester
8/// that it may send Requests for a period of time and how many it may send during that duration.
9///
10/// # Frame Contents
11///
12/// ```text
13/// 0                   1                   2                   3
14///  0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
15/// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
16/// |                         Stream ID = 0                         |
17/// +-----------+-+-+---------------+-------------------------------+
18/// |Frame Type |0|M|     Flags     |
19/// +-----------+-+-+---------------+-------------------------------+
20/// |0|                       Time-To-Live                          |
21/// +---------------------------------------------------------------+
22/// |0|                     Number of Requests                      |
23/// +---------------------------------------------------------------+
24///                             Metadata
25/// ```
26#[derive(Debug, Clone, PartialEq, Eq)]
27pub struct LeaseFrame {
28    ttl: u32,
29    number_of_requests: u32,
30    metadata: Option<Bytes>,
31}
32
33impl LeaseFrame {
34    /// Lease frames MUST always use Stream ID 0 as they pertain to the connection.
35    pub const STREAM_ID: u32 = 0;
36
37    /// Create a new `Lease` frame.
38    ///
39    /// - The `ttl` (Time to Live) is measured in milliseconds.
40    /// - Both `ttl` and `number_of_requests` MUST be <= [`MAX_U31`].
41    pub fn new(
42        ttl: u32,
43        number_of_requests: u32,
44        metadata: Option<Bytes>,
45    ) -> Self {
46        debug_assert_max_u31!(ttl, number_of_requests);
47        LeaseFrame { ttl, number_of_requests, metadata }
48    }
49
50    /// Returns the validity time (in milliseconds) of LEASE from time of reception.
51    pub fn ttl(&self) -> Duration {
52        Duration::from_millis(self.ttl as u64)
53    }
54
55    /// Returns the number of requests that may be sent until next LEASE.
56    pub fn number_of_requests(&self) -> u32 {
57        self.number_of_requests
58    }
59
60    /// Returns the metadata attached to this frame, if any.
61    pub fn metadata(&self) -> Option<&Bytes> {
62        self.metadata.as_ref()
63    }
64}
65
66impl Encode for LeaseFrame {
67    fn encode(&self, buf: &mut BytesMut) {
68        buf.put_u32(0);
69        if self.metadata().is_some() {
70            buf.put_u16(FrameType::LEASE.bits() | Flags::METADATA.bits());
71        } else {
72            buf.put_u16(FrameType::LEASE.bits());
73        };
74        buf.put_u32(self.ttl);
75        buf.put_u32(self.number_of_requests);
76        if let Some(metadata) = &self.metadata {
77            buf.put_slice(metadata);
78        }
79    }
80
81    fn len(&self) -> usize {
82        // len(stream_id): 4
83        // len(flags): 2
84        // len(ttl): 4
85        // len(number_of_requests): 4
86        let mut len = 14;
87
88        // len(metadata)
89        if let Some(metadata) = &self.metadata {
90            len += metadata.len();
91        }
92        len
93    }
94}
95
96impl Decode for LeaseFrame {
97    type Value = Self;
98
99    fn decode<B: Buf>(
100        buf: &mut B,
101        stream_id: u32,
102        _flags: Flags,
103    ) -> Result<Self::Value> {
104        if stream_id != 0 {
105            return Err(DecodeError::InvalidStreamId {
106                expected: "0",
107                found: stream_id,
108            });
109        }
110        let ttl = eat_u31(buf)?;
111        let number_of_requests = eat_u31(buf)?;
112        let metadata = match buf.remaining() {
113            0 => None,
114            len => Some(eat_bytes(buf, len)?),
115        };
116        Ok(LeaseFrame { ttl, number_of_requests, metadata })
117    }
118}
119
120#[cfg(test)]
121mod tests {
122    use super::*;
123
124    #[test]
125    fn test_stream_id() {
126        assert_eq!(LeaseFrame::STREAM_ID, 0);
127    }
128
129    #[test]
130    fn test_codec() {
131        let lease = LeaseFrame::new(10, 20, Some(Bytes::from("metadata")));
132
133        let mut buf = BytesMut::new();
134        lease.encode(&mut buf);
135        let mut buf = buf.freeze();
136
137        // len(stream_id): 4
138        // len(flags): 2
139        // len(ttl): 4
140        // len(number_of_requests): 4
141        // len(metadata): 8
142        let buf_len = buf.len();
143        assert_eq!(buf_len, 4 + 2 + 4 + 4 + 8);
144
145        // Eat the stream_id and flags before decoding bytes.
146        let stream_id = eat_stream_id(&mut buf).unwrap();
147        let (frame_type, flags) = eat_flags(&mut buf).unwrap();
148        assert_eq!(stream_id, 0);
149        assert_eq!(frame_type, FrameType::LEASE);
150        assert_eq!(flags, Flags::METADATA);
151
152        let decoded = LeaseFrame::decode(&mut buf, stream_id, flags).unwrap();
153
154        assert_eq!(decoded, lease);
155        assert_eq!(lease.len(), buf_len);
156        assert_eq!(decoded.len(), buf_len);
157    }
158}