atm0s_sdn_network/base/
msg.rs

1use atm0s_sdn_identity::NodeId;
2use atm0s_sdn_router::{RouteRule, ServiceBroadcastLevel};
3use atm0s_sdn_utils::simple_pub_type;
4use bytes::BufMut;
5use sans_io_runtime::Buffer;
6use serde::de::DeserializeOwned;
7use serde::Serialize;
8
9pub const DEFAULT_MSG_TTL: u8 = 64;
10
11const ROUTE_RULE_DIRECT: u8 = 0;
12const ROUTE_RULE_TO_NODE: u8 = 1;
13const ROUTE_RULE_TO_SERVICE: u8 = 2;
14const ROUTE_RULE_TO_SERVICES: u8 = 3;
15const ROUTE_RULE_TO_KEY: u8 = 4;
16
17simple_pub_type!(Ttl, u8);
18
19impl Default for Ttl {
20    fn default() -> Self {
21        Ttl(DEFAULT_MSG_TTL)
22    }
23}
24
25#[derive(Debug, Eq, PartialEq)]
26pub enum TransportMsgHeaderError {
27    InvalidVersion,
28    InvalidRoute,
29    TooSmall,
30}
31
32/// Fixed Header Fields
33///
34/// ```text
35///     0                   1                   2                   3
36///     0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
37///    +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
38///    |V=0|E|N|   R   |      TTL      |  Feature       |     Meta     |
39///    +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
40///    |                         Route destination (Opt)               |
41///    +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
42///    |                         FromNodeId (Opt)                      |
43///    +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
44/// ```
45///
46/// In there
47///
48/// - Version (V) : 2 bits (now is 0)
49/// - Encrypt (E): 1 bits, If this bit is set, this msg should be encrypted
50/// - From Node (N)    : 1 bits, If this bit is set, from node_id will occupy 32 bits in header
51/// - Route Type (R): 4 bits
52///
53///     - 0: Direct : which node received this msg will handle it, no route destination
54///     - 1: ToNode : which node received this msg will route it to node_id
55///     - 2: ToService : which node received this msg will route it to service meta
56///     - 3: ToKey : which node received this msg will route it to key
57///     - .. Not used
58///
59/// - Ttl (TTL): 8 bits
60/// - Feature Id: 8 bits
61///
62/// - Route destination (Route Destination): 32 bits (if R is not Direct)
63///
64///     - If route type is ToNode, this field is 32bit node_id
65///     - If route type is ToService, this field is 32bit service meta
66///     - If route type is ToKey, this field is 32bit key
67///
68/// - From Node Id: 32 bits (optional if N bit is set)
69///
70
71#[derive(Clone, Debug, Eq, PartialEq)]
72pub struct TransportMsgHeader {
73    pub version: u8,
74    pub encrypt: bool,
75    pub route: RouteRule,
76    pub ttl: u8,
77    pub feature: u8,
78    pub meta: u8,
79    /// Which can be anonymous or specific node
80    pub from_node: Option<NodeId>,
81}
82
83impl Default for TransportMsgHeader {
84    fn default() -> Self {
85        Self::new()
86    }
87}
88
89impl TransportMsgHeader {
90    pub fn is_secure(first_byte: u8) -> bool {
91        first_byte & 0b0010_0000 != 0
92    }
93
94    /// Builds a message with the given service_id, route rule.
95    pub fn new() -> Self {
96        Self {
97            version: 0,
98            encrypt: false,
99            route: RouteRule::Direct,
100            ttl: DEFAULT_MSG_TTL,
101            feature: 0,
102            meta: 0,
103            from_node: None,
104        }
105    }
106
107    pub fn build(feature: u8, meta: u8, route: RouteRule) -> Self {
108        Self {
109            version: 0,
110            encrypt: false,
111            route,
112            ttl: DEFAULT_MSG_TTL,
113            feature,
114            meta,
115            from_node: None,
116        }
117    }
118
119    /// Set ttl
120    pub fn set_ttl(mut self, ttl: u8) -> Self {
121        self.ttl = ttl;
122        self
123    }
124
125    /// Set secure
126    pub fn set_encrypt(mut self, encrypt: bool) -> Self {
127        self.encrypt = encrypt;
128        self
129    }
130
131    /// Set from node
132    pub fn set_from_node(mut self, from_node: Option<NodeId>) -> Self {
133        self.from_node = from_node;
134        self
135    }
136
137    /// Set to feature
138    pub fn set_feature(mut self, feature: u8) -> Self {
139        self.feature = feature;
140        self
141    }
142
143    /// Set to service_id
144    pub fn set_meta(mut self, meta: u8) -> Self {
145        self.meta = meta;
146        self
147    }
148
149    /// Set rule
150    pub fn set_route(mut self, route: RouteRule) -> Self {
151        self.route = route;
152        self
153    }
154
155    /// Converts the message to a byte representation and appends it to the given output vector.
156    ///
157    /// # Arguments
158    ///
159    /// * `output` - A mutable vector of bytes to append the serialized message to.
160    ///
161    /// # Returns
162    ///
163    /// An `Option` containing the number of bytes written if the output vector was large enough, or `None` if the output vector was too small.
164    #[allow(unused_assignments)]
165    pub fn to_bytes(&self, output: &mut [u8]) -> Option<usize> {
166        if output.remaining_mut() < self.serialize_size() {
167            return None;
168        }
169
170        let e_bit = if self.encrypt {
171            1 << 5
172        } else {
173            0
174        };
175        let n_bit = if self.from_node.is_some() {
176            1 << 4
177        } else {
178            0
179        };
180
181        let route_type = match self.route {
182            RouteRule::Direct => ROUTE_RULE_DIRECT,
183            RouteRule::ToNode(_) => ROUTE_RULE_TO_NODE,
184            RouteRule::ToService(_) => ROUTE_RULE_TO_SERVICE,
185            RouteRule::ToServices(_, _, _) => ROUTE_RULE_TO_SERVICES,
186            RouteRule::ToKey(_) => ROUTE_RULE_TO_KEY,
187        };
188
189        output[0] = (self.version << 6) | e_bit | n_bit | (route_type & 15);
190        output[1] = self.ttl;
191        output[2] = self.feature;
192        output[3] = self.meta;
193        let mut ptr = 4;
194        match self.route {
195            RouteRule::Direct => {
196                // Dont need append anything
197            }
198            RouteRule::ToNode(node_id) => {
199                output[ptr..ptr + 4].copy_from_slice(&node_id.to_be_bytes());
200                ptr += 4;
201            }
202            RouteRule::ToService(service) => {
203                output[ptr] = service;
204                ptr += 4;
205            }
206            RouteRule::ToServices(service, level, seq) => {
207                output[ptr] = service;
208                output[ptr + 1] = level.into();
209                output[ptr + 2..ptr + 4].copy_from_slice(&seq.to_be_bytes());
210                ptr += 4;
211            }
212            RouteRule::ToKey(key) => {
213                output[ptr..ptr + 4].copy_from_slice(&key.to_be_bytes());
214                ptr += 4;
215            }
216        }
217        if let Some(from_node) = self.from_node {
218            output[ptr..ptr + 4].copy_from_slice(&from_node.to_be_bytes());
219            ptr += 4;
220        }
221
222        Some(
223            4 + if self.from_node.is_some() {
224                4
225            } else {
226                0
227            } + if self.route == RouteRule::Direct {
228                0
229            } else {
230                4
231            },
232        )
233    }
234
235    /// Rewrite the ttl in the given buffer with the new ttl.
236    ///
237    /// # Arguments
238    ///
239    /// * `buf` - A mutable slice of bytes representing the buffer to rewrite the ttl in.
240    /// * `new_ttl` - The new ttl to use for rewriting the ttl.
241    ///
242    /// # Returns
243    ///
244    /// An `Option` containing `()` if the ttl was successfully rewritten,
245    /// or `None` if the buffer is too small to hold the new ttl.
246    pub fn rewrite_ttl(buf: &mut [u8], new_ttl: u8) -> Option<()> {
247        if buf.len() < 2 {
248            return None;
249        }
250        buf[1] = new_ttl;
251        Some(())
252    }
253
254    /// Decrease the ttl in the given buffer.
255    ///
256    /// # Arguments
257    ///
258    /// * `buf` - A mutable slice of bytes representing the buffer to decrease the ttl in.
259    ///
260    /// # Returns
261    ///
262    /// An `Option` containing `()` if the ttl was successfully decreased,
263    /// or `None` if the buffer is too small to hold the new ttl or ttl too small.
264    pub fn decrease_ttl(buf: &mut [u8]) -> bool {
265        if buf.len() < 2 {
266            return false;
267        }
268        if buf[1] == 0 {
269            return false;
270        }
271        buf[1] = buf[1].saturating_sub(1);
272        true
273    }
274
275    /// Returns the size of the serialized message.
276    pub fn serialize_size(&self) -> usize {
277        4 + if self.from_node.is_some() {
278            4
279        } else {
280            0
281        } + if self.route == RouteRule::Direct {
282            0
283        } else {
284            4
285        }
286    }
287}
288
289#[derive(Clone, Debug, Eq, PartialEq)]
290/// A struct representing a transport message.
291pub struct TransportMsg {
292    buffer: Buffer,
293    pub header: TransportMsgHeader,
294    pub payload_start: usize,
295}
296
297/// TransportMsg represents a message that can be sent over the network.
298/// It contains methods for building, modifying, and extracting data from the message.
299impl TransportMsg {
300    /// Check if the message is secure
301    pub fn is_secure_header(first_byte: u8) -> bool {
302        (first_byte >> 2) & 1 == 1
303    }
304
305    /// Builds a raw message from a message header and payload.
306    ///
307    /// # Arguments
308    ///
309    /// * `header` - The message header.
310    /// * `payload` - The message payload.
311    ///
312    /// # Returns
313    ///
314    /// A new `TransportMsg` instance.
315    pub fn build_raw(header: TransportMsgHeader, mut payload: Buffer) -> Self {
316        let header_size = header.serialize_size();
317        let _ = header.to_bytes(payload.front_mut(header_size)).expect("Should serialize header");
318        payload.move_front_left(header_size);
319        Self {
320            buffer: payload,
321            header,
322            payload_start: header_size,
323        }
324    }
325
326    /// Builds a message from a service ID, route rule, stream ID, and payload.
327    ///
328    /// # Arguments
329    ///
330    /// * `service_id` - The service ID of the message.
331    /// * `route` - The route rule of the message.
332    /// * `stream_id` - The stream ID of the message.
333    /// * `payload` - The payload of the message.
334    ///
335    /// # Returns
336    ///
337    /// A new `TransportMsg` instance.
338    pub fn build(feature: u8, meta: u8, route: RouteRule, payload: &[u8]) -> Self {
339        let header = TransportMsgHeader::new().set_feature(feature).set_meta(meta).set_route(route);
340        let header_size = header.serialize_size();
341        let mut buffer = Buffer::new(0, header_size + payload.len());
342        let _ = header.to_bytes(buffer.back_mut(header_size)).expect("Should serialize header");
343        buffer.move_back_right(header_size);
344        buffer.push_back(payload);
345
346        Self {
347            buffer,
348            header,
349            payload_start: header_size,
350        }
351    }
352
353    /// Takes ownership of the message and returns its buffer.
354    pub fn take(self) -> Buffer {
355        self.buffer
356    }
357
358    /// Returns a reference to the message buffer.
359    pub fn get_buf(&self) -> &[u8] {
360        &self.buffer
361    }
362
363    /// Returns a reference to the message payload.
364    pub fn payload(&self) -> &[u8] {
365        &self.buffer[self.payload_start..]
366    }
367
368    /// Returns a mutable reference to the message payload.
369    pub fn payload_mut(&mut self) -> &mut [u8] {
370        &mut self.buffer[self.payload_start..]
371    }
372
373    /// Deserializes the message payload into a given type using bincode.
374    ///
375    /// # Type Parameters
376    ///
377    /// * `M` - The type to deserialize the payload into.
378    ///
379    /// # Returns
380    ///
381    /// A `bincode::Result` containing the deserialized payload.
382    pub fn get_payload_bincode<M: DeserializeOwned>(&self) -> bincode::Result<M> {
383        bincode::deserialize::<M>(self.payload())
384    }
385
386    /// Constructs a TransportMsg from a message header and payload using bincode.
387    ///
388    /// # Arguments
389    ///
390    /// * `header` - The message header.
391    /// * `msg` - The message payload.
392    ///
393    /// # Returns
394    ///
395    /// A new `TransportMsg` instance.
396    pub fn from_payload_bincode<M: Serialize>(header: TransportMsgHeader, msg: &M) -> Self {
397        let header_size = header.serialize_size();
398        let payload_size = bincode::serialized_size(msg).expect("Should serialize payload") as usize;
399        let mut buffer = Buffer::new(0, header_size + payload_size);
400        let _ = header.to_bytes(buffer.back_mut(header_size)).expect("Should serialize header");
401        buffer.move_back_right(header_size);
402        bincode::serialize_into(buffer.back_mut(payload_size), msg).expect("Should serialize payload");
403        buffer.move_back_right(payload_size);
404
405        Self {
406            buffer,
407            header,
408            payload_start: header_size,
409        }
410    }
411}
412
413impl TryFrom<Vec<u8>> for TransportMsg {
414    type Error = TransportMsgHeaderError;
415    fn try_from(buffer: Vec<u8>) -> Result<Self, Self::Error> {
416        let header = TransportMsgHeader::try_from(buffer.as_slice())?;
417        Ok(Self {
418            buffer: buffer.into(),
419            payload_start: header.serialize_size(),
420            header,
421        })
422    }
423}
424
425impl TryFrom<&[u8]> for TransportMsg {
426    type Error = TransportMsgHeaderError;
427    fn try_from(bytes: &[u8]) -> Result<Self, Self::Error> {
428        let header = TransportMsgHeader::try_from(bytes)?;
429        Ok(Self {
430            buffer: bytes.to_vec().into(),
431            payload_start: header.serialize_size(),
432            header,
433        })
434    }
435}
436
437#[allow(unused_assignments)]
438impl TryFrom<&[u8]> for TransportMsgHeader {
439    type Error = TransportMsgHeaderError;
440    fn try_from(bytes: &[u8]) -> Result<Self, Self::Error> {
441        if bytes.len() < 4 {
442            return Err(TransportMsgHeaderError::TooSmall);
443        }
444        let version = bytes[0] >> 6; //2 bits
445        let e_bit = (bytes[0] >> 5) & 1 == 1; //1 bit
446        let n_bit = (bytes[0] >> 4) & 1 == 1; //1 bit
447        let route_type = bytes[0] & 15; //4 bits
448
449        if version != 0 {
450            return Err(TransportMsgHeaderError::InvalidVersion);
451        }
452
453        let ttl = bytes[1];
454        let feature = bytes[2];
455        let meta = bytes[3];
456
457        let mut ptr = 4;
458
459        let route = match route_type {
460            ROUTE_RULE_DIRECT => RouteRule::Direct,
461            ROUTE_RULE_TO_NODE => {
462                if bytes.len() < ptr + 4 {
463                    return Err(TransportMsgHeaderError::TooSmall);
464                }
465                let rr = RouteRule::ToNode(NodeId::from_be_bytes([bytes[ptr], bytes[ptr + 1], bytes[ptr + 2], bytes[ptr + 3]]));
466                ptr += 4;
467                rr
468            }
469            ROUTE_RULE_TO_SERVICE => {
470                if bytes.len() < ptr + 4 {
471                    return Err(TransportMsgHeaderError::TooSmall);
472                }
473                let rr = RouteRule::ToService(bytes[ptr]);
474                ptr += 4;
475                rr
476            }
477            ROUTE_RULE_TO_SERVICES => {
478                if bytes.len() < ptr + 4 {
479                    return Err(TransportMsgHeaderError::TooSmall);
480                }
481                let rr = RouteRule::ToServices(bytes[ptr], ServiceBroadcastLevel::from(bytes[ptr + 1]), u16::from_be_bytes([bytes[ptr + 2], bytes[ptr + 3]]));
482                ptr += 4;
483                rr
484            }
485            ROUTE_RULE_TO_KEY => {
486                if bytes.len() < ptr + 4 {
487                    return Err(TransportMsgHeaderError::TooSmall);
488                }
489                let rr = RouteRule::ToKey(NodeId::from_be_bytes([bytes[ptr], bytes[ptr + 1], bytes[ptr + 2], bytes[ptr + 3]]));
490                ptr += 4;
491                rr
492            }
493            _ => return Err(TransportMsgHeaderError::InvalidRoute),
494        };
495
496        let from_node = if n_bit {
497            if bytes.len() < ptr + 4 {
498                return Err(TransportMsgHeaderError::TooSmall);
499            }
500            let from_node_id = NodeId::from_be_bytes([bytes[ptr], bytes[ptr + 1], bytes[ptr + 2], bytes[ptr + 3]]);
501            ptr += 4;
502            Some(from_node_id)
503        } else {
504            None
505        };
506
507        Ok(Self {
508            version,
509            encrypt: e_bit,
510            ttl,
511            route,
512            feature,
513            meta,
514            from_node,
515        })
516    }
517}
518
519#[cfg(test)]
520mod tests {
521    use super::*;
522
523    /// test header without option
524    #[test]
525    fn test_header_without_option() {
526        let mut buf = vec![0u8; 16];
527        let header = TransportMsgHeader {
528            version: 0,
529            ttl: 1,
530            feature: 2,
531            meta: 3,
532            route: RouteRule::Direct,
533            encrypt: true,
534            from_node: None,
535        };
536        let size = header.to_bytes(&mut buf).expect("should serialize");
537        assert_eq!(header.serialize_size(), 4);
538        let header = TransportMsgHeader::try_from(&buf[0..size]).expect("");
539        assert_eq!(header.serialize_size(), 4);
540        assert_eq!(header.version, 0);
541        assert_eq!(header.ttl, 1);
542        assert_eq!(header.feature, 2);
543        assert_eq!(header.meta, 3);
544        assert_eq!(header.route, RouteRule::Direct);
545        assert_eq!(header.encrypt, true);
546        assert_eq!(header.from_node, None);
547    }
548
549    /// test header without option
550    #[test]
551    fn test_header_with_node_dest() {
552        let mut buf = [0; 16];
553        let header = TransportMsgHeader {
554            version: 0,
555            ttl: 1,
556            feature: 2,
557            meta: 3,
558            route: RouteRule::ToNode(4),
559            encrypt: true,
560            from_node: None,
561        };
562        let size = header.to_bytes(&mut buf).expect("should serialize");
563        assert_eq!(header.serialize_size(), 8);
564        let header = TransportMsgHeader::try_from(&buf[0..size]).expect("");
565        assert_eq!(header.version, 0);
566        assert_eq!(header.ttl, 1);
567        assert_eq!(header.feature, 2);
568        assert_eq!(header.meta, 3);
569        assert_eq!(header.route, RouteRule::ToNode(4));
570        assert_eq!(header.from_node, None);
571    }
572
573    /// test header without option
574    #[test]
575    fn test_header_with_service_dest() {
576        let mut buf = [0; 16];
577        let header = TransportMsgHeader {
578            version: 0,
579            ttl: 1,
580            feature: 2,
581            meta: 3,
582            route: RouteRule::ToServices(4, ServiceBroadcastLevel::Geo2, 1000),
583            encrypt: true,
584            from_node: None,
585        };
586        let size = header.to_bytes(&mut buf).expect("should serialize");
587        assert_eq!(header.serialize_size(), 8);
588        let header = TransportMsgHeader::try_from(&buf[0..size]).expect("");
589        assert_eq!(header.version, 0);
590        assert_eq!(header.ttl, 1);
591        assert_eq!(header.feature, 2);
592        assert_eq!(header.meta, 3);
593        assert_eq!(header.route, RouteRule::ToServices(4, ServiceBroadcastLevel::Geo2, 1000));
594        assert_eq!(header.from_node, None);
595    }
596
597    /// test header without option
598    #[test]
599    fn test_header_with_all_options() {
600        let mut buf = [0; 16];
601        let header = TransportMsgHeader {
602            version: 0,
603            ttl: 1,
604            feature: 2,
605            meta: 3,
606            route: RouteRule::ToService(4),
607            encrypt: true,
608            from_node: Some(5),
609        };
610        let size = header.to_bytes(&mut buf).expect("should serialize");
611        assert_eq!(header.serialize_size(), 12);
612        let header = TransportMsgHeader::try_from(&buf[0..size]).expect("");
613        assert_eq!(header.version, 0);
614        assert_eq!(header.ttl, 1);
615        assert_eq!(header.feature, 2);
616        assert_eq!(header.meta, 3);
617        assert_eq!(header.route, RouteRule::ToService(4));
618        assert_eq!(header.from_node, Some(5));
619    }
620
621    /// test with invalid version
622    #[test]
623    fn test_with_invalid_version() {
624        let mut buf = [0; 16];
625        let header = TransportMsgHeader {
626            version: 1,
627            ttl: 1,
628            feature: 2,
629            meta: 3,
630            route: RouteRule::ToNode(4),
631            encrypt: true,
632            from_node: Some(5),
633        };
634        let size = header.to_bytes(&mut buf).expect("should serialize");
635        let err = TransportMsgHeader::try_from(&buf[0..size]).unwrap_err();
636        assert_eq!(err, TransportMsgHeaderError::InvalidVersion);
637    }
638
639    #[test]
640    fn msg_simple() {
641        let msg = TransportMsg::build(0, 0, RouteRule::Direct, &[1, 2, 3, 4]);
642        let buf = msg.get_buf();
643        let msg2 = TransportMsg::try_from(buf).expect("");
644        assert_eq!(msg, msg2);
645        assert_eq!(msg.payload(), &[1, 2, 3, 4]);
646    }
647
648    #[test]
649    fn msg_build_raw() {
650        let msg = TransportMsg::build_raw(TransportMsgHeader::new(), vec![1, 2, 3, 4].into());
651        let buf = msg.get_buf().to_vec();
652        let msg2 = TransportMsg::try_from(buf).expect("");
653        assert_eq!(msg, msg2);
654        assert_eq!(msg.payload(), &[1, 2, 3, 4]);
655    }
656}