cassandra_proto/frame/
parser.rs1use std::cell::RefCell;
2use std::io::{Cursor, Read};
3
4use super::*;
5use crate::compression::Compressor;
6use crate::error;
7use crate::frame::frame_response::ResponseBody;
8use crate::frame::FromCursor;
9use crate::types::data_serialization_types::decode_timeuuid;
10use crate::types::{from_bytes, from_u16_bytes, CStringList, UUID_LEN};
11
12pub fn parse_frame<E>(
13 cursor_cell: &RefCell<dyn Read>,
14 compressor: &dyn Compressor<CompressorError = E>,
15) -> error::Result<Frame>
16where
17 E: std::error::Error,
18{
19 let mut version_bytes = [0; Version::BYTE_LENGTH];
20 let mut flag_bytes = [0; Flag::BYTE_LENGTH];
21 let mut opcode_bytes = [0; Opcode::BYTE_LENGTH];
22 let mut stream_bytes = [0; STREAM_LEN];
23 let mut length_bytes = [0; LENGTH_LEN];
24 let mut cursor = cursor_cell.borrow_mut();
25
26 cursor.read_exact(&mut version_bytes)?;
28 cursor.read_exact(&mut flag_bytes)?;
29 cursor.read_exact(&mut stream_bytes)?;
30 cursor.read_exact(&mut opcode_bytes)?;
31 cursor.read_exact(&mut length_bytes)?;
32
33 let version = Version::from(version_bytes.to_vec());
34 let flags = Flag::get_collection(flag_bytes[0]);
35 let stream = from_u16_bytes(&stream_bytes);
36 let opcode = Opcode::from(opcode_bytes[0]);
37 let length = from_bytes(&length_bytes) as usize;
38
39 let mut body_bytes = Vec::with_capacity(length);
40 unsafe {
41 body_bytes.set_len(length);
42 }
43
44 cursor.read_exact(&mut body_bytes)?;
45
46 let full_body = if flags.iter().any(|flag| flag == &Flag::Compression) {
47 compressor
48 .decode(body_bytes)
49 .map_err(|err| error::Error::from(err.description()))?
50 } else {
51 body_bytes
52 };
53
54 let mut body_cursor = Cursor::new(full_body.as_slice());
56
57 let tracing_id = if flags.iter().any(|flag| flag == &Flag::Tracing) {
58 let mut tracing_bytes = Vec::with_capacity(UUID_LEN);
59 unsafe {
60 tracing_bytes.set_len(UUID_LEN);
61 }
62 body_cursor.read_exact(&mut tracing_bytes)?;
63
64 decode_timeuuid(tracing_bytes.as_slice()).ok()
65 } else {
66 None
67 };
68
69 let warnings = if flags.iter().any(|flag| flag == &Flag::Warning) {
70 CStringList::from_cursor(&mut body_cursor)?.into_plain()
71 } else {
72 vec![]
73 };
74
75 let mut body = vec![];
76
77 body_cursor.read_to_end(&mut body)?;
78
79 let frame = Frame {
80 version: version,
81 flags: flags,
82 opcode: opcode,
83 stream: stream,
84 body: body,
85 tracing_id: tracing_id,
86 warnings: warnings,
87 };
88
89 convert_frame_into_result(frame)
90}
91
92fn convert_frame_into_result(frame: Frame) -> error::Result<Frame> {
93 match frame.opcode {
94 Opcode::Error => frame.get_body().and_then(|err| match err {
95 ResponseBody::Error(err) => Err(error::Error::Server(err)),
96 _ => unreachable!(),
97 }),
98 _ => Ok(frame),
99 }
100}