1use std::io::{Error, ErrorKind};
9use std::sync::atomic::AtomicUsize;
10use std::time::{SystemTime, UNIX_EPOCH};
11use std::{io, str, usize};
12
13use byteorder::{BigEndian, ByteOrder};
14use bytes::{BufMut, BytesMut};
15use crc16::*;
16use num::{FromPrimitive, ToPrimitive};
17use num_derive::{FromPrimitive, ToPrimitive};
18use serde_derive::{Deserialize, Serialize};
19use serde_json::Value;
20use tokio_io::_tokio_codec::{Decoder, Encoder};
21
22const FP_OFF_TYPE: usize = 0x1;
23const FP_OFF_STATUS: usize = 0x2;
24const FP_OFF_MSGID: usize = 0x3;
25const FP_OFF_CRC: usize = 0x7;
26const FP_OFF_DATALEN: usize = 0xb;
27const FP_OFF_DATA: usize = 0xf;
28
29pub const FP_HEADER_SZ: usize = FP_OFF_DATA;
31
32const FP_VERSION_2: u8 = 0x2;
33const FP_VERSION_CURRENT: u8 = FP_VERSION_2;
34
35#[derive(Default)]
39pub struct FastMessageId(AtomicUsize);
40
41impl FastMessageId {
42 pub fn new() -> Self {
44 FastMessageId(AtomicUsize::new(0x0))
45 }
46}
47
48impl Iterator for FastMessageId {
49 type Item = usize;
50
51 fn next(&mut self) -> Option<Self::Item> {
54 let id_value = self.0.get_mut();
56 let current = *id_value;
57 *id_value = (*id_value + 1) % (usize::max_value() - 1);
58
59 Some(current)
60 }
61}
62
63#[derive(Debug)]
65pub enum FastParseError {
66 NotEnoughBytes(usize),
67 IOError(Error),
68}
69
70impl From<io::Error> for FastParseError {
71 fn from(error: io::Error) -> Self {
72 FastParseError::IOError(error)
73 }
74}
75
76impl From<FastParseError> for Error {
77 fn from(pfr: FastParseError) -> Self {
78 match pfr {
79 FastParseError::NotEnoughBytes(_) => {
80 let msg = "Unable to parse message: not enough bytes";
81 Error::new(ErrorKind::Other, msg)
82 }
83 FastParseError::IOError(e) => e,
84 }
85 }
86}
87
88#[derive(Debug, Deserialize, Serialize)]
91pub struct FastMessageServerError {
92 pub name: String,
93 pub message: String,
94}
95
96impl FastMessageServerError {
97 pub fn new(name: &str, message: &str) -> Self {
98 FastMessageServerError {
99 name: String::from(name),
100 message: String::from(message),
101 }
102 }
103}
104
105impl From<FastMessageServerError> for Error {
106 fn from(err: FastMessageServerError) -> Self {
107 Error::new(ErrorKind::Other, format!("{}: {}", err.name, err.message))
108 }
109}
110
111#[derive(Debug, FromPrimitive, ToPrimitive, PartialEq, Clone)]
114pub enum FastMessageType {
115 Json = 1,
116}
117
118#[derive(Debug, FromPrimitive, ToPrimitive, PartialEq, Clone)]
120pub enum FastMessageStatus {
121 Data = 1,
122 End = 2,
123 Error = 3,
124}
125
126pub struct FastMessageHeader {
128 msg_type: FastMessageType,
130 status: FastMessageStatus,
132 id: u32,
134 crc: u32,
136 data_len: usize,
138}
139
140#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
143pub struct FastMessageMetaData {
144 pub uts: u64,
145 pub name: String,
146}
147
148impl FastMessageMetaData {
149 pub fn new(n: String) -> FastMessageMetaData {
150 let now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap();
151 let now_micros =
152 now.as_secs() * 1_000_000 + u64::from(now.subsec_micros());
153
154 FastMessageMetaData {
155 uts: now_micros,
156 name: n,
157 }
158 }
159}
160
161#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
163pub struct FastMessageData {
164 pub m: FastMessageMetaData,
165 pub d: Value,
166}
167
168impl FastMessageData {
169 pub fn new(n: String, d: Value) -> FastMessageData {
170 FastMessageData {
171 m: FastMessageMetaData::new(n),
172 d,
173 }
174 }
175}
176
177#[derive(Debug, Clone)]
179pub struct FastMessage {
180 pub msg_type: FastMessageType,
182 pub status: FastMessageStatus,
184 pub id: u32,
186 pub msg_size: Option<usize>,
188 pub data: FastMessageData,
190}
191
192impl PartialEq for FastMessage {
193 fn eq(&self, other: &FastMessage) -> bool {
194 self.msg_type == other.msg_type
195 && self.status == other.status
196 && self.id == other.id
197 && self.msg_size == other.msg_size
198 && self.data == other.data
199 }
200}
201
202impl FastMessage {
203 pub fn parse(buf: &[u8]) -> Result<FastMessage, FastParseError> {
206 FastMessage::check_buffer_size(buf)?;
207 let header = FastMessage::parse_header(buf)?;
208
209 FastMessage::validate_data_length(buf, header.data_len)?;
210 let raw_data = &buf[FP_OFF_DATA..FP_OFF_DATA + header.data_len];
211 FastMessage::validate_crc(raw_data, header.crc)?;
212 let data = FastMessage::parse_data(raw_data)?;
213
214 let msg_size = match header.status {
215 FastMessageStatus::End => None,
216 _ => Some(FP_OFF_DATA + header.data_len),
217 };
218
219 Ok(FastMessage {
220 msg_type: header.msg_type,
221 status: header.status,
222 id: header.id,
223 msg_size,
224 data,
225 })
226 }
227
228 pub fn check_buffer_size(buf: &[u8]) -> Result<(), FastParseError> {
231 if buf.len() < FP_HEADER_SZ {
232 Err(FastParseError::NotEnoughBytes(buf.len()))
233 } else {
234 Ok(())
235 }
236 }
237
238 pub fn parse_header(
242 buf: &[u8],
243 ) -> Result<FastMessageHeader, FastParseError> {
244 let msg_type =
245 FromPrimitive::from_u8(buf[FP_OFF_TYPE]).ok_or_else(|| {
246 let msg = "Failed to parse message type";
247 FastParseError::IOError(Error::new(ErrorKind::Other, msg))
248 })?;
249 let status =
250 FromPrimitive::from_u8(buf[FP_OFF_STATUS]).ok_or_else(|| {
251 let msg = "Failed to parse message status";
252 FastParseError::IOError(Error::new(ErrorKind::Other, msg))
253 })?;
254 let msg_id = BigEndian::read_u32(&buf[FP_OFF_MSGID..FP_OFF_MSGID + 4]);
255 let expected_crc =
256 BigEndian::read_u32(&buf[FP_OFF_CRC..FP_OFF_CRC + 4]);
257 let data_len =
258 BigEndian::read_u32(&buf[FP_OFF_DATALEN..FP_OFF_DATALEN + 4])
259 as usize;
260
261 Ok(FastMessageHeader {
262 msg_type,
263 status,
264 id: msg_id,
265 crc: expected_crc,
266 data_len,
267 })
268 }
269
270 fn validate_data_length(
271 buf: &[u8],
272 data_length: usize,
273 ) -> Result<(), FastParseError> {
274 if buf.len() < (FP_HEADER_SZ + data_length) {
275 Err(FastParseError::NotEnoughBytes(buf.len()))
276 } else {
277 Ok(())
278 }
279 }
280
281 fn validate_crc(data_buf: &[u8], crc: u32) -> Result<(), FastParseError> {
282 let calculated_crc = u32::from(State::<ARC>::calculate(data_buf));
283 if crc != calculated_crc {
284 let msg = "Calculated CRC does not match the provided CRC";
285 Err(FastParseError::IOError(Error::new(ErrorKind::Other, msg)))
286 } else {
287 Ok(())
288 }
289 }
290
291 fn parse_data(data_buf: &[u8]) -> Result<FastMessageData, FastParseError> {
292 match str::from_utf8(data_buf) {
293 Ok(data_str) => serde_json::from_str(data_str).map_err(|_e| {
294 let msg = "Failed to parse data payload as JSON";
295 FastParseError::IOError(Error::new(ErrorKind::Other, msg))
296 }),
297 Err(_) => {
298 let msg = "Failed to parse data payload as UTF-8";
299 Err(FastParseError::IOError(Error::new(ErrorKind::Other, msg)))
300 }
301 }
302 }
303
304 pub fn data(msg_id: u32, data: FastMessageData) -> FastMessage {
307 FastMessage {
308 msg_type: FastMessageType::Json,
309 status: FastMessageStatus::Data,
310 id: msg_id,
311 msg_size: None,
312 data,
313 }
314 }
315
316 pub fn end(msg_id: u32, method: String) -> FastMessage {
320 FastMessage {
321 msg_type: FastMessageType::Json,
322 status: FastMessageStatus::End,
323 id: msg_id,
324 msg_size: None,
325 data: FastMessageData::new(method, Value::Array(vec![])),
326 }
327 }
328
329 pub fn error(msg_id: u32, data: FastMessageData) -> FastMessage {
332 FastMessage {
333 msg_type: FastMessageType::Json,
334 status: FastMessageStatus::Error,
335 id: msg_id,
336 msg_size: None,
337 data,
338 }
339 }
340}
341
342pub struct FastRpc;
344
345impl Decoder for FastRpc {
346 type Item = Vec<FastMessage>;
347 type Error = Error;
348
349 fn decode(
350 &mut self,
351 buf: &mut BytesMut,
352 ) -> Result<Option<Self::Item>, Error> {
353 let mut msgs: Self::Item = Vec::new();
354 let mut done = false;
355
356 while !done && !buf.is_empty() {
357 if msgs.len() + 1 > msgs.capacity() {
359 msgs.reserve(1);
360 }
361
362 match FastMessage::parse(&buf) {
363 Ok(parsed_msg) => {
364 let data_str =
366 serde_json::to_string(&parsed_msg.data).unwrap();
367 let data_len = data_str.len();
368 buf.advance(FP_HEADER_SZ + data_len);
369 msgs.push(parsed_msg);
370 Ok(())
371 }
372 Err(FastParseError::NotEnoughBytes(_)) => {
373 done = true;
377 Ok(())
378 }
379 Err(err) => {
380 let msg = format!(
381 "failed to parse Fast request: {}",
382 Error::from(err)
383 );
384 Err(Error::new(ErrorKind::Other, msg))
385 }
386 }?
387 }
388
389 if msgs.is_empty() {
390 Ok(None)
391 } else {
392 Ok(Some(msgs))
393 }
394 }
395}
396
397impl Encoder for FastRpc {
398 type Item = Vec<FastMessage>;
399 type Error = io::Error;
401 fn encode(
402 &mut self,
403 item: Self::Item,
404 buf: &mut BytesMut,
405 ) -> Result<(), io::Error> {
406 let results: Vec<Result<(), String>> =
407 item.iter().map(|x| encode_msg(x, buf)).collect();
408 let result: Result<Vec<()>, String> = results.iter().cloned().collect();
409 match result {
410 Ok(_) => Ok(()),
411 Err(errs) => Err(Error::new(ErrorKind::Other, errs)),
412 }
413 }
414}
415
416pub(crate) fn encode_msg(
419 msg: &FastMessage,
420 buf: &mut BytesMut,
421) -> Result<(), String> {
422 let m_msg_type_u8 = msg.msg_type.to_u8();
423 let m_status_u8 = msg.status.to_u8();
424 match (m_msg_type_u8, m_status_u8) {
425 (Some(msg_type_u8), Some(status_u8)) => {
426 let data_str = serde_json::to_string(&msg.data).unwrap();
428 let data_len = data_str.len();
429 let buf_capacity = buf.capacity();
430 if buf.len() + FP_HEADER_SZ + data_len > buf_capacity {
431 buf.reserve(FP_HEADER_SZ + data_len as usize);
432 }
433 buf.put_u8(FP_VERSION_CURRENT);
434 buf.put_u8(msg_type_u8);
435 buf.put_u8(status_u8);
436 buf.put_u32_be(msg.id);
437 buf.put_u32_be(u32::from(State::<ARC>::calculate(
438 data_str.as_bytes(),
439 )));
440 buf.put_u32_be(data_str.len() as u32);
441 buf.put(data_str);
442 Ok(())
443 }
444 (None, Some(_)) => Err(String::from("Invalid message type")),
445 (Some(_), None) => Err(String::from("Invalid status")),
446 (None, None) => Err(String::from("Invalid message type and status")),
447 }
448}
449
450#[cfg(test)]
451mod test {
452 use super::*;
453
454 use std::iter;
455
456 use quickcheck::{quickcheck, Arbitrary, Gen};
457 use rand::distributions::Alphanumeric;
458 use rand::seq::SliceRandom;
459 use rand::Rng;
460 use serde_json::Map;
461
462 fn random_string<G: Gen>(g: &mut G, len: usize) -> String {
463 iter::repeat(())
464 .map(|()| g.sample(Alphanumeric))
465 .take(len)
466 .collect()
467 }
468
469 fn nested_object<G: Gen>(g: &mut G) -> Value {
470 let k_len = g.gen::<u8>() as usize;
471 let v_len = g.gen::<u8>() as usize;
472 let k = random_string(g, k_len);
473 let v = random_string(g, v_len);
474 let count = g.gen::<u64>();
475 let mut inner_obj = Map::new();
476 let mut outer_obj = Map::new();
477 let _ = inner_obj.insert(k, Value::String(v));
478 outer_obj
479 .insert(String::from("value"), Value::Object(inner_obj))
480 .and_then(|_| {
481 outer_obj.insert(String::from("count"), count.into())
482 });
483 Value::Object(outer_obj)
484 }
485
486 #[derive(Clone, Debug)]
487 struct MessageCount(u8);
488
489 impl Arbitrary for MessageCount {
490 fn arbitrary<G: Gen>(g: &mut G) -> MessageCount {
491 let mut c = 0;
492 while c == 0 {
493 c = g.gen::<u8>()
494 }
495
496 MessageCount(c)
497 }
498 }
499
500 impl Arbitrary for FastMessageStatus {
501 fn arbitrary<G: Gen>(g: &mut G) -> FastMessageStatus {
502 let choices = [
503 FastMessageStatus::Data,
504 FastMessageStatus::End,
505 FastMessageStatus::Error,
506 ];
507
508 choices.choose(g).unwrap().clone()
509 }
510 }
511
512 impl Arbitrary for FastMessageType {
513 fn arbitrary<G: Gen>(g: &mut G) -> FastMessageType {
514 let choices = [FastMessageType::Json];
515
516 choices.choose(g).unwrap().clone()
517 }
518 }
519
520 impl Arbitrary for FastMessageMetaData {
521 fn arbitrary<G: Gen>(g: &mut G) -> FastMessageMetaData {
522 let name = random_string(g, 10);
523 FastMessageMetaData::new(name)
524 }
525 }
526
527 impl Arbitrary for FastMessageData {
528 fn arbitrary<G: Gen>(g: &mut G) -> FastMessageData {
529 let md = FastMessageMetaData::arbitrary(g);
530
531 let choices = [
532 Value::Array(vec![]),
533 Value::Object(Map::new()),
534 nested_object(g),
535 Value::Array(vec![nested_object(g)]),
536 ];
537
538 let value = choices.choose(g).unwrap().clone();
539
540 FastMessageData { m: md, d: value }
541 }
542 }
543
544 impl Arbitrary for FastMessage {
545 fn arbitrary<G: Gen>(g: &mut G) -> FastMessage {
546 let msg_type = FastMessageType::arbitrary(g);
547 let status = FastMessageStatus::arbitrary(g);
548 let id = g.gen::<u32>();
549
550 let data = FastMessageData::arbitrary(g);
551 let data_str = serde_json::to_string(&data).unwrap();
552 let msg_sz = match status {
553 FastMessageStatus::End => None,
554 _ => Some(FP_OFF_DATA + data_str.len()),
555 };
556
557 FastMessage {
558 msg_type,
559 status,
560 id,
561 msg_size: msg_sz,
562 data,
563 }
564 }
565 }
566
567 quickcheck! {
568 fn prop_fast_message_roundtrip(msg: FastMessage) -> bool {
569 let mut write_buf = BytesMut::new();
570 match encode_msg(&msg, &mut write_buf) {
571 Ok(_) => {
572 match FastMessage::parse(&write_buf) {
573 Ok(decoded_msg) => decoded_msg == msg,
574 Err(_) => false
575 }
576 },
577 Err(_) => false
578 }
579 }
580 }
581
582 quickcheck! {
583 fn prop_fast_message_bundling(msg: FastMessage, msg_count: MessageCount) -> bool {
584 let mut write_buf = BytesMut::new();
585 let mut error_occurred = false;
586 for _ in 0..msg_count.0 {
587 match encode_msg(&msg, &mut write_buf) {
588 Ok(_) => (),
589 Err(_) => {
590 error_occurred = true;
591 }
592 }
593 }
594
595 if error_occurred {
596 return false;
597 }
598
599 let msg_size = write_buf.len() / msg_count.0 as usize;
600 let mut offset = 0;
601 for _ in 0..msg_count.0 {
602 match FastMessage::parse(&write_buf[offset..offset+msg_size]) {
603 Ok(decoded_msg) => error_occurred = decoded_msg != msg,
604 Err(_) => error_occurred = true
605 }
606 offset += msg_size;
607 }
608
609 !error_occurred
610 }
611 }
612
613 quickcheck! {
614 fn prop_fast_message_decoding(msg: FastMessage, msg_count: MessageCount) -> bool {
615 let mut write_buf = BytesMut::new();
616 let mut error_occurred = false;
617 let mut fast_msgs: Vec<FastMessage> =
618 Vec::with_capacity(msg_count.0 as usize);
619
620 (0..msg_count.0).for_each(|_| {
621 fast_msgs.push(msg.clone());
622 });
623
624 let mut fast_rpc = FastRpc;
625 let encode_res = fast_rpc.encode(fast_msgs, &mut write_buf);
626
627 if encode_res.is_err() {
628 return false;
629 }
630
631 let decode_result = fast_rpc.decode(&mut write_buf);
632 if decode_result.is_err() {
633 return false;
634 }
635
636 let m_decoded_msgs = decode_result.unwrap();
637
638
639 if m_decoded_msgs.is_none() {
640 return false;
641 }
642
643 let decoded_msgs = m_decoded_msgs.unwrap();
644 if decoded_msgs.len() != msg_count.0 as usize {
645 return false;
646 }
647
648
649 for decoded_msg in decoded_msgs {
650 error_occurred = decoded_msg != msg;
651 }
652
653 !error_occurred
654 }
655 }
656}