cassandra_proto/frame/
frame_batch.rs

1use rand;
2
3use super::{Frame, Flag, IntoBytes, AsByte, FromSingleByte, Opcode, Version};
4use crate::query::QueryFlags;
5use crate::types::*;
6use crate::consistency::Consistency;
7use crate::query::QueryValues;
8
9/// `BodyResReady`
10#[derive(Debug, Clone)]
11pub struct BodyReqBatch {
12    pub batch_type: BatchType,
13    pub queries: Vec<BatchQuery>,
14    pub consistency: Consistency,
15    /// **IMPORTANT NOTE:** with names flag does not work and should not be used.
16    /// https://github.com/apache/cassandra/blob/trunk/doc/native_protocol_v4.spec#L413
17    pub query_flags: Vec<QueryFlags>,
18    pub serial_consistency: Option<Consistency>,
19    pub timestamp: Option<i64>,
20}
21
22impl IntoBytes for BodyReqBatch {
23    fn into_cbytes(&self) -> Vec<u8> {
24        let mut bytes = vec![];
25
26        bytes.push(self.batch_type.as_byte());
27
28        bytes.extend_from_slice(to_short(self.queries.len() as i16).as_slice());
29
30        bytes = self.queries.iter().fold(bytes, |mut _bytes, q| {
31            _bytes.extend_from_slice(q.into_cbytes().as_slice());
32            _bytes
33        });
34
35        bytes.extend_from_slice(self.consistency.into_cbytes().as_slice());
36
37        let flag_byte = self.query_flags.iter()
38                            .fold(0, |mut _bytes, f| _bytes | f.as_byte());
39        bytes.push(flag_byte);
40
41        if let Some(ref serial_consistency) = self.serial_consistency {
42            bytes.extend_from_slice(serial_consistency.into_cbytes().as_slice());
43        }
44
45        if let Some(ref timestamp) = self.timestamp {
46            //bytes.extend_from_slice(to_bigint(timestamp.clone()).as_slice());
47            bytes.extend_from_slice(to_bigint(*timestamp).as_slice());
48        }
49
50        bytes
51    }
52}
53
54/// Batch type
55#[derive(Debug, Clone, PartialEq)]
56pub enum BatchType {
57    /// The batch will be "logged". This is equivalent to a
58    /// normal CQL3 batch statement.
59    Logged,
60    /// The batch will be "unlogged".
61    Unlogged,
62    /// The batch will be a "counter" batch (and non-counter
63    /// statements will be rejected).
64    Counter,
65}
66
67impl FromSingleByte for BatchType {
68    fn from_byte(byte: u8) -> BatchType {
69        match byte {
70            0 => BatchType::Logged,
71            1 => BatchType::Unlogged,
72            2 => BatchType::Counter,
73            _ => unreachable!(),
74        }
75    }
76}
77
78impl AsByte for BatchType {
79    fn as_byte(&self) -> u8 {
80        match *self {
81            BatchType::Logged => 0,
82            BatchType::Unlogged => 1,
83            BatchType::Counter => 2,
84        }
85    }
86}
87
88/// The structure that represents a query to be batched.
89#[derive(Debug, Clone)]
90pub struct BatchQuery {
91    /// It indicates if a query was prepared.
92    pub is_prepared: bool,
93    /// It contains either id of prepared query of a query itself.
94    pub subject: BatchQuerySubj,
95    /// It is the optional name of the following <value_i>. It must be present
96    /// if and only if the 0x40 flag is provided for the batch.
97    /// **Important note:** this feature does not work and should not be
98    /// used. It is specified in a way that makes it impossible for the server
99    /// to implement. This will be fixed in a future version of the native
100    /// protocol. See https://issues.apache.org/jira/browse/CASSANDRA-10246 for
101    /// more details
102    pub values: QueryValues,
103}
104
105/// It contains either an id of prepared query or CQL string.
106#[derive(Debug, Clone)]
107pub enum BatchQuerySubj {
108    PreparedId(CBytesShort),
109    QueryString(CStringLong),
110}
111
112impl IntoBytes for BatchQuery {
113    fn into_cbytes(&self) -> Vec<u8> {
114        let mut bytes = vec![];
115
116        // kind
117        if self.is_prepared {
118            bytes.push(1);
119        } else {
120            bytes.push(0);
121        }
122
123        match self.subject {
124            BatchQuerySubj::PreparedId(ref s) => {
125                bytes.extend_from_slice(s.into_cbytes().as_slice());
126            }
127            BatchQuerySubj::QueryString(ref s) => {
128                bytes.extend_from_slice(s.into_cbytes().as_slice());
129            }
130        }
131
132        bytes.extend_from_slice(to_short(self.values.len() as i16).as_slice());
133
134        bytes.extend_from_slice(self.values.into_cbytes().as_slice());
135
136        bytes
137    }
138}
139
140impl Frame {
141    /// **Note:** This function should be used internally for building query request frames.
142    pub fn new_req_batch(query: BodyReqBatch, flags: Vec<Flag>) -> Frame {
143        let version = Version::Request;
144        let stream = rand::random::<u16>();
145        let opcode = Opcode::Batch;
146
147        Frame { version: version,
148                flags: flags,
149                stream: stream,
150                opcode: opcode,
151                body: query.into_cbytes(),
152                // for request frames it's always None
153                tracing_id: None,
154                warnings: vec![], }
155    }
156}