fast_rpc/
protocol.rs

1// Copyright 2020 Joyent, Inc.
2
3//! This module contains the types and functions used to encode and decode Fast
4//! messages. The contents of this module are not needed for normal client or
5//! server consumers of this crate, but they are exposed for the special case of
6//! someone needing to implement custom client or server code.
7
8use std::io::{Error, ErrorKind};
9use std::sync::atomic::AtomicUsize;
10use std::time::{SystemTime, UNIX_EPOCH};
11use std::{io, str, usize};
12
13use byteorder::{BigEndian, ByteOrder};
14use bytes::{BufMut, BytesMut};
15use crc16::*;
16use num::{FromPrimitive, ToPrimitive};
17use num_derive::{FromPrimitive, ToPrimitive};
18use serde_derive::{Deserialize, Serialize};
19use serde_json::Value;
20use tokio_io::_tokio_codec::{Decoder, Encoder};
21
22const FP_OFF_TYPE: usize = 0x1;
23const FP_OFF_STATUS: usize = 0x2;
24const FP_OFF_MSGID: usize = 0x3;
25const FP_OFF_CRC: usize = 0x7;
26const FP_OFF_DATALEN: usize = 0xb;
27const FP_OFF_DATA: usize = 0xf;
28
29/// The size of a Fast message header
30pub const FP_HEADER_SZ: usize = FP_OFF_DATA;
31
32const FP_VERSION_2: u8 = 0x2;
33const FP_VERSION_CURRENT: u8 = FP_VERSION_2;
34
35/// A data type representing a Fast message id that can safely be shard between
36/// threads. The `next` associated function retrieves the next id value and
37/// manages the circular message id space internally.
38#[derive(Default)]
39pub struct FastMessageId(AtomicUsize);
40
41impl FastMessageId {
42    /// Creates a new FastMessageId
43    pub fn new() -> Self {
44        FastMessageId(AtomicUsize::new(0x0))
45    }
46}
47
48impl Iterator for FastMessageId {
49    type Item = usize;
50
51    /// Returns the next Fast message id and increments the value modulo the
52    /// usize MAX_VALUE - 1.
53    fn next(&mut self) -> Option<Self::Item> {
54        // Increment our count. This is why we started at zero.
55        let id_value = self.0.get_mut();
56        let current = *id_value;
57        *id_value = (*id_value + 1) % (usize::max_value() - 1);
58
59        Some(current)
60    }
61}
62
63/// An error type representing a failure to parse a buffer as a Fast message.
64#[derive(Debug)]
65pub enum FastParseError {
66    NotEnoughBytes(usize),
67    IOError(Error),
68}
69
70impl From<io::Error> for FastParseError {
71    fn from(error: io::Error) -> Self {
72        FastParseError::IOError(error)
73    }
74}
75
76impl From<FastParseError> for Error {
77    fn from(pfr: FastParseError) -> Self {
78        match pfr {
79            FastParseError::NotEnoughBytes(_) => {
80                let msg = "Unable to parse message: not enough bytes";
81                Error::new(ErrorKind::Other, msg)
82            }
83            FastParseError::IOError(e) => e,
84        }
85    }
86}
87
88/// An error type representing Fast error messages that may be returned from a
89/// Fast server.
90#[derive(Debug, Deserialize, Serialize)]
91pub struct FastMessageServerError {
92    pub name: String,
93    pub message: String,
94}
95
96impl FastMessageServerError {
97    pub fn new(name: &str, message: &str) -> Self {
98        FastMessageServerError {
99            name: String::from(name),
100            message: String::from(message),
101        }
102    }
103}
104
105impl From<FastMessageServerError> for Error {
106    fn from(err: FastMessageServerError) -> Self {
107        Error::new(ErrorKind::Other, format!("{}: {}", err.name, err.message))
108    }
109}
110
111/// Represents the Type field of a Fast message. Currently there is only one
112/// valid value, JSON.
113#[derive(Debug, FromPrimitive, ToPrimitive, PartialEq, Clone)]
114pub enum FastMessageType {
115    Json = 1,
116}
117
118/// Represents the Status field of a Fast message.
119#[derive(Debug, FromPrimitive, ToPrimitive, PartialEq, Clone)]
120pub enum FastMessageStatus {
121    Data = 1,
122    End = 2,
123    Error = 3,
124}
125
126/// This type encapsulates the header of a Fast message.
127pub struct FastMessageHeader {
128    /// The Type field of the Fast message
129    msg_type: FastMessageType,
130    /// The Status field of the Fast message
131    status: FastMessageStatus,
132    /// The Fast message identifier
133    id: u32,
134    /// The CRC16 check value of the Fast message data payload
135    crc: u32,
136    /// The length in bytes of the Fast message data payload
137    data_len: usize,
138}
139
140/// Represents the metadata about a `FastMessage` data payload. This includes a
141/// timestamp and an RPC method name.
142#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
143pub struct FastMessageMetaData {
144    pub uts: u64,
145    pub name: String,
146}
147
148impl FastMessageMetaData {
149    pub fn new(n: String) -> FastMessageMetaData {
150        let now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap();
151        let now_micros =
152            now.as_secs() * 1_000_000 + u64::from(now.subsec_micros());
153
154        FastMessageMetaData {
155            uts: now_micros,
156            name: n,
157        }
158    }
159}
160
161/// Encapsulates the Fast message metadata and the JSON formatted message data.
162#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
163pub struct FastMessageData {
164    pub m: FastMessageMetaData,
165    pub d: Value,
166}
167
168impl FastMessageData {
169    pub fn new(n: String, d: Value) -> FastMessageData {
170        FastMessageData {
171            m: FastMessageMetaData::new(n),
172            d,
173        }
174    }
175}
176
177/// Represents a Fast message including the header and data payload
178#[derive(Debug, Clone)]
179pub struct FastMessage {
180    /// The Type field of the Fast message
181    pub msg_type: FastMessageType,
182    /// The Status field of the Fast message
183    pub status: FastMessageStatus,
184    /// The Fast message identifier
185    pub id: u32,
186    /// The length in bytes of the Fast message data payload
187    pub msg_size: Option<usize>,
188    /// The data payload of the Fast message
189    pub data: FastMessageData,
190}
191
192impl PartialEq for FastMessage {
193    fn eq(&self, other: &FastMessage) -> bool {
194        self.msg_type == other.msg_type
195            && self.status == other.status
196            && self.id == other.id
197            && self.msg_size == other.msg_size
198            && self.data == other.data
199    }
200}
201
202impl FastMessage {
203    /// Parse a byte buffer into a `FastMessage`. Returns a `FastParseError` if
204    /// the available bytes cannot be parsed to a `FastMessage`.
205    pub fn parse(buf: &[u8]) -> Result<FastMessage, FastParseError> {
206        FastMessage::check_buffer_size(buf)?;
207        let header = FastMessage::parse_header(buf)?;
208
209        FastMessage::validate_data_length(buf, header.data_len)?;
210        let raw_data = &buf[FP_OFF_DATA..FP_OFF_DATA + header.data_len];
211        FastMessage::validate_crc(raw_data, header.crc)?;
212        let data = FastMessage::parse_data(raw_data)?;
213
214        let msg_size = match header.status {
215            FastMessageStatus::End => None,
216            _ => Some(FP_OFF_DATA + header.data_len),
217        };
218
219        Ok(FastMessage {
220            msg_type: header.msg_type,
221            status: header.status,
222            id: header.id,
223            msg_size,
224            data,
225        })
226    }
227
228    /// Check that the provided byte buffer contains at least `FP_HEADER_SZ`
229    /// bytes.  Returns a `FastParseError` if this is not the case.
230    pub fn check_buffer_size(buf: &[u8]) -> Result<(), FastParseError> {
231        if buf.len() < FP_HEADER_SZ {
232            Err(FastParseError::NotEnoughBytes(buf.len()))
233        } else {
234            Ok(())
235        }
236    }
237
238    /// Parse a portion of a byte buffer into a `FastMessageHeader`. Returns a
239    /// `FastParseError` if the available bytes cannot be parsed to a
240    /// `FastMessageHeader`.
241    pub fn parse_header(
242        buf: &[u8],
243    ) -> Result<FastMessageHeader, FastParseError> {
244        let msg_type =
245            FromPrimitive::from_u8(buf[FP_OFF_TYPE]).ok_or_else(|| {
246                let msg = "Failed to parse message type";
247                FastParseError::IOError(Error::new(ErrorKind::Other, msg))
248            })?;
249        let status =
250            FromPrimitive::from_u8(buf[FP_OFF_STATUS]).ok_or_else(|| {
251                let msg = "Failed to parse message status";
252                FastParseError::IOError(Error::new(ErrorKind::Other, msg))
253            })?;
254        let msg_id = BigEndian::read_u32(&buf[FP_OFF_MSGID..FP_OFF_MSGID + 4]);
255        let expected_crc =
256            BigEndian::read_u32(&buf[FP_OFF_CRC..FP_OFF_CRC + 4]);
257        let data_len =
258            BigEndian::read_u32(&buf[FP_OFF_DATALEN..FP_OFF_DATALEN + 4])
259                as usize;
260
261        Ok(FastMessageHeader {
262            msg_type,
263            status,
264            id: msg_id,
265            crc: expected_crc,
266            data_len,
267        })
268    }
269
270    fn validate_data_length(
271        buf: &[u8],
272        data_length: usize,
273    ) -> Result<(), FastParseError> {
274        if buf.len() < (FP_HEADER_SZ + data_length) {
275            Err(FastParseError::NotEnoughBytes(buf.len()))
276        } else {
277            Ok(())
278        }
279    }
280
281    fn validate_crc(data_buf: &[u8], crc: u32) -> Result<(), FastParseError> {
282        let calculated_crc = u32::from(State::<ARC>::calculate(data_buf));
283        if crc != calculated_crc {
284            let msg = "Calculated CRC does not match the provided CRC";
285            Err(FastParseError::IOError(Error::new(ErrorKind::Other, msg)))
286        } else {
287            Ok(())
288        }
289    }
290
291    fn parse_data(data_buf: &[u8]) -> Result<FastMessageData, FastParseError> {
292        match str::from_utf8(data_buf) {
293            Ok(data_str) => serde_json::from_str(data_str).map_err(|_e| {
294                let msg = "Failed to parse data payload as JSON";
295                FastParseError::IOError(Error::new(ErrorKind::Other, msg))
296            }),
297            Err(_) => {
298                let msg = "Failed to parse data payload as UTF-8";
299                Err(FastParseError::IOError(Error::new(ErrorKind::Other, msg)))
300            }
301        }
302    }
303
304    /// Returns a `FastMessage` that represents a Fast protocol `DATA` message
305    /// with the provided message identifer and data payload.
306    pub fn data(msg_id: u32, data: FastMessageData) -> FastMessage {
307        FastMessage {
308            msg_type: FastMessageType::Json,
309            status: FastMessageStatus::Data,
310            id: msg_id,
311            msg_size: None,
312            data,
313        }
314    }
315
316    /// Returns a `FastMessage` that represents a Fast protocol `END` message
317    /// with the provided message identifer. The method parameter is used in the
318    /// otherwise empty data payload.
319    pub fn end(msg_id: u32, method: String) -> FastMessage {
320        FastMessage {
321            msg_type: FastMessageType::Json,
322            status: FastMessageStatus::End,
323            id: msg_id,
324            msg_size: None,
325            data: FastMessageData::new(method, Value::Array(vec![])),
326        }
327    }
328
329    /// Returns a `FastMessage` that represents a Fast protocol `ERROR` message
330    /// with the provided message identifer and data payload.
331    pub fn error(msg_id: u32, data: FastMessageData) -> FastMessage {
332        FastMessage {
333            msg_type: FastMessageType::Json,
334            status: FastMessageStatus::Error,
335            id: msg_id,
336            msg_size: None,
337            data,
338        }
339    }
340}
341
342/// This type implements the functions necessary for the Fast protocl framing.
343pub struct FastRpc;
344
345impl Decoder for FastRpc {
346    type Item = Vec<FastMessage>;
347    type Error = Error;
348
349    fn decode(
350        &mut self,
351        buf: &mut BytesMut,
352    ) -> Result<Option<Self::Item>, Error> {
353        let mut msgs: Self::Item = Vec::new();
354        let mut done = false;
355
356        while !done && !buf.is_empty() {
357            // Make sure there is room in msgs to fit a message
358            if msgs.len() + 1 > msgs.capacity() {
359                msgs.reserve(1);
360            }
361
362            match FastMessage::parse(&buf) {
363                Ok(parsed_msg) => {
364                    // TODO: Handle the error case here!
365                    let data_str =
366                        serde_json::to_string(&parsed_msg.data).unwrap();
367                    let data_len = data_str.len();
368                    buf.advance(FP_HEADER_SZ + data_len);
369                    msgs.push(parsed_msg);
370                    Ok(())
371                }
372                Err(FastParseError::NotEnoughBytes(_)) => {
373                    // Not enough bytes available yet so we need to return
374                    // Ok(None) to let the Framed instance know to read more
375                    // data before calling this function again.
376                    done = true;
377                    Ok(())
378                }
379                Err(err) => {
380                    let msg = format!(
381                        "failed to parse Fast request: {}",
382                        Error::from(err)
383                    );
384                    Err(Error::new(ErrorKind::Other, msg))
385                }
386            }?
387        }
388
389        if msgs.is_empty() {
390            Ok(None)
391        } else {
392            Ok(Some(msgs))
393        }
394    }
395}
396
397impl Encoder for FastRpc {
398    type Item = Vec<FastMessage>;
399    //TODO: Create custom FastMessage error type
400    type Error = io::Error;
401    fn encode(
402        &mut self,
403        item: Self::Item,
404        buf: &mut BytesMut,
405    ) -> Result<(), io::Error> {
406        let results: Vec<Result<(), String>> =
407            item.iter().map(|x| encode_msg(x, buf)).collect();
408        let result: Result<Vec<()>, String> = results.iter().cloned().collect();
409        match result {
410            Ok(_) => Ok(()),
411            Err(errs) => Err(Error::new(ErrorKind::Other, errs)),
412        }
413    }
414}
415
416/// Encode a `FastMessage` into a byte buffer. The `Result` contains a unit type
417/// on success and an error string on failure.
418pub(crate) fn encode_msg(
419    msg: &FastMessage,
420    buf: &mut BytesMut,
421) -> Result<(), String> {
422    let m_msg_type_u8 = msg.msg_type.to_u8();
423    let m_status_u8 = msg.status.to_u8();
424    match (m_msg_type_u8, m_status_u8) {
425        (Some(msg_type_u8), Some(status_u8)) => {
426            // TODO: Handle the error case here!
427            let data_str = serde_json::to_string(&msg.data).unwrap();
428            let data_len = data_str.len();
429            let buf_capacity = buf.capacity();
430            if buf.len() + FP_HEADER_SZ + data_len > buf_capacity {
431                buf.reserve(FP_HEADER_SZ + data_len as usize);
432            }
433            buf.put_u8(FP_VERSION_CURRENT);
434            buf.put_u8(msg_type_u8);
435            buf.put_u8(status_u8);
436            buf.put_u32_be(msg.id);
437            buf.put_u32_be(u32::from(State::<ARC>::calculate(
438                data_str.as_bytes(),
439            )));
440            buf.put_u32_be(data_str.len() as u32);
441            buf.put(data_str);
442            Ok(())
443        }
444        (None, Some(_)) => Err(String::from("Invalid message type")),
445        (Some(_), None) => Err(String::from("Invalid status")),
446        (None, None) => Err(String::from("Invalid message type and status")),
447    }
448}
449
450#[cfg(test)]
451mod test {
452    use super::*;
453
454    use std::iter;
455
456    use quickcheck::{quickcheck, Arbitrary, Gen};
457    use rand::distributions::Alphanumeric;
458    use rand::seq::SliceRandom;
459    use rand::Rng;
460    use serde_json::Map;
461
462    fn random_string<G: Gen>(g: &mut G, len: usize) -> String {
463        iter::repeat(())
464            .map(|()| g.sample(Alphanumeric))
465            .take(len)
466            .collect()
467    }
468
469    fn nested_object<G: Gen>(g: &mut G) -> Value {
470        let k_len = g.gen::<u8>() as usize;
471        let v_len = g.gen::<u8>() as usize;
472        let k = random_string(g, k_len);
473        let v = random_string(g, v_len);
474        let count = g.gen::<u64>();
475        let mut inner_obj = Map::new();
476        let mut outer_obj = Map::new();
477        let _ = inner_obj.insert(k, Value::String(v));
478        outer_obj
479            .insert(String::from("value"), Value::Object(inner_obj))
480            .and_then(|_| {
481                outer_obj.insert(String::from("count"), count.into())
482            });
483        Value::Object(outer_obj)
484    }
485
486    #[derive(Clone, Debug)]
487    struct MessageCount(u8);
488
489    impl Arbitrary for MessageCount {
490        fn arbitrary<G: Gen>(g: &mut G) -> MessageCount {
491            let mut c = 0;
492            while c == 0 {
493                c = g.gen::<u8>()
494            }
495
496            MessageCount(c)
497        }
498    }
499
500    impl Arbitrary for FastMessageStatus {
501        fn arbitrary<G: Gen>(g: &mut G) -> FastMessageStatus {
502            let choices = [
503                FastMessageStatus::Data,
504                FastMessageStatus::End,
505                FastMessageStatus::Error,
506            ];
507
508            choices.choose(g).unwrap().clone()
509        }
510    }
511
512    impl Arbitrary for FastMessageType {
513        fn arbitrary<G: Gen>(g: &mut G) -> FastMessageType {
514            let choices = [FastMessageType::Json];
515
516            choices.choose(g).unwrap().clone()
517        }
518    }
519
520    impl Arbitrary for FastMessageMetaData {
521        fn arbitrary<G: Gen>(g: &mut G) -> FastMessageMetaData {
522            let name = random_string(g, 10);
523            FastMessageMetaData::new(name)
524        }
525    }
526
527    impl Arbitrary for FastMessageData {
528        fn arbitrary<G: Gen>(g: &mut G) -> FastMessageData {
529            let md = FastMessageMetaData::arbitrary(g);
530
531            let choices = [
532                Value::Array(vec![]),
533                Value::Object(Map::new()),
534                nested_object(g),
535                Value::Array(vec![nested_object(g)]),
536            ];
537
538            let value = choices.choose(g).unwrap().clone();
539
540            FastMessageData { m: md, d: value }
541        }
542    }
543
544    impl Arbitrary for FastMessage {
545        fn arbitrary<G: Gen>(g: &mut G) -> FastMessage {
546            let msg_type = FastMessageType::arbitrary(g);
547            let status = FastMessageStatus::arbitrary(g);
548            let id = g.gen::<u32>();
549
550            let data = FastMessageData::arbitrary(g);
551            let data_str = serde_json::to_string(&data).unwrap();
552            let msg_sz = match status {
553                FastMessageStatus::End => None,
554                _ => Some(FP_OFF_DATA + data_str.len()),
555            };
556
557            FastMessage {
558                msg_type,
559                status,
560                id,
561                msg_size: msg_sz,
562                data,
563            }
564        }
565    }
566
567    quickcheck! {
568        fn prop_fast_message_roundtrip(msg: FastMessage) -> bool {
569            let mut write_buf = BytesMut::new();
570            match encode_msg(&msg, &mut write_buf) {
571                Ok(_) => {
572                    match FastMessage::parse(&write_buf) {
573                        Ok(decoded_msg) => decoded_msg == msg,
574                        Err(_) => false
575                    }
576                },
577                Err(_) => false
578            }
579        }
580    }
581
582    quickcheck! {
583        fn prop_fast_message_bundling(msg: FastMessage, msg_count: MessageCount) -> bool {
584            let mut write_buf = BytesMut::new();
585            let mut error_occurred = false;
586            for _ in 0..msg_count.0 {
587                match encode_msg(&msg, &mut write_buf) {
588                    Ok(_) => (),
589                    Err(_) => {
590                        error_occurred = true;
591                    }
592                }
593            }
594
595            if error_occurred {
596                return false;
597            }
598
599            let msg_size = write_buf.len() / msg_count.0 as usize;
600            let mut offset = 0;
601            for _ in 0..msg_count.0 {
602                match FastMessage::parse(&write_buf[offset..offset+msg_size]) {
603                    Ok(decoded_msg) => error_occurred = decoded_msg != msg,
604                    Err(_) => error_occurred = true
605                }
606                offset += msg_size;
607            }
608
609            !error_occurred
610        }
611    }
612
613    quickcheck! {
614        fn prop_fast_message_decoding(msg: FastMessage, msg_count: MessageCount) -> bool {
615            let mut write_buf = BytesMut::new();
616            let mut error_occurred = false;
617            let mut fast_msgs: Vec<FastMessage> =
618                Vec::with_capacity(msg_count.0 as usize);
619
620            (0..msg_count.0).for_each(|_| {
621                fast_msgs.push(msg.clone());
622            });
623
624            let mut fast_rpc = FastRpc;
625            let encode_res = fast_rpc.encode(fast_msgs, &mut write_buf);
626
627            if encode_res.is_err() {
628                return false;
629            }
630
631            let decode_result = fast_rpc.decode(&mut write_buf);
632            if decode_result.is_err() {
633                return false;
634            }
635
636            let m_decoded_msgs = decode_result.unwrap();
637
638
639            if m_decoded_msgs.is_none() {
640                return false;
641            }
642
643            let decoded_msgs = m_decoded_msgs.unwrap();
644            if decoded_msgs.len() != msg_count.0 as usize {
645                return false;
646            }
647
648
649            for decoded_msg in decoded_msgs {
650                error_occurred = decoded_msg != msg;
651            }
652
653            !error_occurred
654        }
655    }
656}