cassandra_proto/frame/
parser_async.rs

1use std::io::{Cursor, Read};
2
3use super::*;
4use crate::compression::Compressor;
5use crate::error;
6use crate::frame::frame_response::ResponseBody;
7use crate::frame::FromCursor;
8use crate::types::data_serialization_types::decode_timeuuid;
9use crate::types::{from_bytes, from_u16_bytes, CStringList, UUID_LEN};
10
11macro_rules! proceed_if_filled {
12  ($res: expr, $expected_len: expr) => {
13    if $expected_len > $res? {
14      return Ok(None);
15    }
16  };
17}
18
19pub fn parse_frame_async<E, C>(
20  cursor: &mut C,
21  compressor: &dyn Compressor<CompressorError = E>,
22) -> error::Result<Option<Frame>>
23where
24  E: std::error::Error,
25  C: Read,
26{
27  let mut version_bytes = [0; Version::BYTE_LENGTH];
28  let mut flag_bytes = [0; Flag::BYTE_LENGTH];
29  let mut opcode_bytes = [0; Opcode::BYTE_LENGTH];
30  let mut stream_bytes = [0; STREAM_LEN];
31  let mut length_bytes = [0; LENGTH_LEN];
32
33  // NOTE: order of reads matters
34  proceed_if_filled!(cursor.read(&mut version_bytes), Version::BYTE_LENGTH);
35  proceed_if_filled!(cursor.read(&mut flag_bytes), Flag::BYTE_LENGTH);
36  proceed_if_filled!(cursor.read(&mut stream_bytes), STREAM_LEN);
37  proceed_if_filled!(cursor.read(&mut opcode_bytes), Opcode::BYTE_LENGTH);
38  proceed_if_filled!(cursor.read(&mut length_bytes), LENGTH_LEN);
39
40  let version = Version::from(version_bytes.to_vec());
41  let flags = Flag::get_collection(flag_bytes[0]);
42  let stream = from_u16_bytes(&stream_bytes);
43  let opcode = Opcode::from(opcode_bytes[0]);
44  let length = from_bytes(&length_bytes) as usize;
45
46  let mut body_bytes = Vec::with_capacity(length);
47  unsafe {
48    body_bytes.set_len(length);
49  }
50
51  proceed_if_filled!(cursor.read(&mut body_bytes), length);
52
53  let full_body = if flags.iter().any(|flag| flag == &Flag::Compression) {
54    compressor
55      .decode(body_bytes)
56      .map_err(|err| error::Error::from(err.description()))?
57  } else {
58    body_bytes
59  };
60
61  // Use cursor to get tracing id, warnings and actual body
62  let mut body_cursor = Cursor::new(full_body.as_slice());
63
64  let tracing_id = if flags.iter().any(|flag| flag == &Flag::Tracing) {
65    let mut tracing_bytes = Vec::with_capacity(UUID_LEN);
66    unsafe {
67      tracing_bytes.set_len(UUID_LEN);
68    }
69    body_cursor.read_exact(&mut tracing_bytes)?;
70
71    decode_timeuuid(tracing_bytes.as_slice()).ok()
72  } else {
73    None
74  };
75
76  let warnings = if flags.iter().any(|flag| flag == &Flag::Warning) {
77    CStringList::from_cursor(&mut body_cursor)?.into_plain()
78  } else {
79    vec![]
80  };
81
82  let mut body = vec![];
83
84  body_cursor.read_to_end(&mut body)?;
85
86  let frame = Frame {
87    version: version,
88    flags: flags,
89    opcode: opcode,
90    stream: stream,
91    body: body,
92    tracing_id: tracing_id,
93    warnings: warnings,
94  };
95
96  Ok(Some(frame))
97}
98
99pub fn convert_frame_into_result(frame: Frame) -> error::Result<Frame> {
100  match frame.opcode {
101    Opcode::Error => frame.get_body().and_then(|err| match err {
102      ResponseBody::Error(err) => Err(error::Error::Server(err)),
103      _ => unreachable!(),
104    }),
105    _ => Ok(frame),
106  }
107}