cassandra_proto/frame/
parser.rs

1use std::cell::RefCell;
2use std::io::{Cursor, Read};
3
4use super::*;
5use crate::compression::Compressor;
6use crate::error;
7use crate::frame::frame_response::ResponseBody;
8use crate::frame::FromCursor;
9use crate::types::data_serialization_types::decode_timeuuid;
10use crate::types::{from_bytes, from_u16_bytes, CStringList, UUID_LEN};
11
12pub fn parse_frame<E>(
13    cursor_cell: &RefCell<dyn Read>,
14    compressor: &dyn Compressor<CompressorError = E>,
15) -> error::Result<Frame>
16where
17    E: std::error::Error,
18{
19    let mut version_bytes = [0; Version::BYTE_LENGTH];
20    let mut flag_bytes = [0; Flag::BYTE_LENGTH];
21    let mut opcode_bytes = [0; Opcode::BYTE_LENGTH];
22    let mut stream_bytes = [0; STREAM_LEN];
23    let mut length_bytes = [0; LENGTH_LEN];
24    let mut cursor = cursor_cell.borrow_mut();
25
26    // NOTE: order of reads matters
27    cursor.read_exact(&mut version_bytes)?;
28    cursor.read_exact(&mut flag_bytes)?;
29    cursor.read_exact(&mut stream_bytes)?;
30    cursor.read_exact(&mut opcode_bytes)?;
31    cursor.read_exact(&mut length_bytes)?;
32
33    let version = Version::from(version_bytes.to_vec());
34    let flags = Flag::get_collection(flag_bytes[0]);
35    let stream = from_u16_bytes(&stream_bytes);
36    let opcode = Opcode::from(opcode_bytes[0]);
37    let length = from_bytes(&length_bytes) as usize;
38
39    let mut body_bytes = Vec::with_capacity(length);
40    unsafe {
41        body_bytes.set_len(length);
42    }
43
44    cursor.read_exact(&mut body_bytes)?;
45
46    let full_body = if flags.iter().any(|flag| flag == &Flag::Compression) {
47        compressor
48            .decode(body_bytes)
49            .map_err(|err| error::Error::from(err.description()))?
50    } else {
51        body_bytes
52    };
53
54    // Use cursor to get tracing id, warnings and actual body
55    let mut body_cursor = Cursor::new(full_body.as_slice());
56
57    let tracing_id = if flags.iter().any(|flag| flag == &Flag::Tracing) {
58        let mut tracing_bytes = Vec::with_capacity(UUID_LEN);
59        unsafe {
60            tracing_bytes.set_len(UUID_LEN);
61        }
62        body_cursor.read_exact(&mut tracing_bytes)?;
63
64        decode_timeuuid(tracing_bytes.as_slice()).ok()
65    } else {
66        None
67    };
68
69    let warnings = if flags.iter().any(|flag| flag == &Flag::Warning) {
70        CStringList::from_cursor(&mut body_cursor)?.into_plain()
71    } else {
72        vec![]
73    };
74
75    let mut body = vec![];
76
77    body_cursor.read_to_end(&mut body)?;
78
79    let frame = Frame {
80        version: version,
81        flags: flags,
82        opcode: opcode,
83        stream: stream,
84        body: body,
85        tracing_id: tracing_id,
86        warnings: warnings,
87    };
88
89    convert_frame_into_result(frame)
90}
91
92fn convert_frame_into_result(frame: Frame) -> error::Result<Frame> {
93    match frame.opcode {
94        Opcode::Error => frame.get_body().and_then(|err| match err {
95            ResponseBody::Error(err) => Err(error::Error::Server(err)),
96            _ => unreachable!(),
97        }),
98        _ => Ok(frame),
99    }
100}