Skip to main content

scylla_cql/frame/request/
mod.rs

1//! CQL requests sent by the client.
2
3pub mod auth_response;
4pub mod batch;
5pub mod execute;
6pub mod options;
7pub mod prepare;
8pub mod query;
9pub mod register;
10pub mod startup;
11
12use batch::BatchTypeParseError;
13use thiserror::Error;
14
15use crate::Consistency;
16use crate::frame::protocol_features::ProtocolFeatures;
17use crate::frame::request::execute::ExecuteV2;
18use crate::serialize::row::SerializedValues;
19use bytes::Bytes;
20
21pub use auth_response::AuthResponse;
22pub use batch::Batch;
23#[expect(deprecated)]
24pub use execute::Execute;
25pub use options::Options;
26pub use prepare::Prepare;
27pub use query::Query;
28pub use startup::Startup;
29
30use self::batch::BatchStatement;
31
32use super::TryFromPrimitiveError;
33use super::frame_errors::{CqlRequestSerializationError, LowLevelDeserializationError};
34use super::types::SerialConsistency;
35
36/// Possible requests sent by the client.
37// Why is it distinct from [RequestOpcode]?
38// TODO(2.0): merge this with `RequestOpcode`.
39#[derive(Debug, Copy, Clone)]
40#[non_exhaustive]
41pub enum CqlRequestKind {
42    /// Initialize the connection. The server will respond by either a READY message
43    /// (in which case the connection is ready for queries) or an AUTHENTICATE message
44    /// (in which case credentials will need to be provided using AUTH_RESPONSE).
45    ///
46    /// This must be the first message of the connection, except for OPTIONS that can
47    /// be sent before to find out the options supported by the server. Once the
48    /// connection has been initialized, a client should not send any more STARTUP
49    /// messages.
50    Startup,
51
52    /// Answers a server authentication challenge.
53    ///
54    /// Authentication in the protocol is SASL based. The server sends authentication
55    /// challenges (a bytes token) to which the client answers with this message. Those
56    /// exchanges continue until the server accepts the authentication by sending a
57    /// AUTH_SUCCESS message after a client AUTH_RESPONSE. Note that the exchange
58    /// begins with the client sending an initial AUTH_RESPONSE in response to a
59    /// server AUTHENTICATE request.
60    ///
61    /// The response to a AUTH_RESPONSE is either a follow-up AUTH_CHALLENGE message,
62    /// an AUTH_SUCCESS message or an ERROR message.
63    AuthResponse,
64
65    /// Asks the server to return which STARTUP options are supported. The server
66    /// will respond with a SUPPORTED message.
67    Options,
68
69    /// Performs a CQL query, i.e., executes an unprepared statement.
70    /// The server will respond to a QUERY message with a RESULT message, the content
71    /// of which depends on the query.
72    Query,
73
74    /// Prepares a query for later execution (through EXECUTE).
75    /// The server will respond with a RESULT::Prepared message.
76    Prepare,
77
78    /// Executes a prepared query.
79    /// The response from the server will be a RESULT message.
80    Execute,
81
82    /// Allows executing a list of queries (prepared or not) as a batch (note that
83    /// only DML statements are accepted in a batch).
84    /// The server will respond with a RESULT message.
85    Batch,
86
87    /// Register this connection to receive some types of events.
88    /// The response to a REGISTER message will be a READY message.
89    Register,
90}
91
92impl std::fmt::Display for CqlRequestKind {
93    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
94        let kind_str = match self {
95            CqlRequestKind::Startup => "STARTUP",
96            CqlRequestKind::AuthResponse => "AUTH_RESPONSE",
97            CqlRequestKind::Options => "OPTIONS",
98            CqlRequestKind::Query => "QUERY",
99            CqlRequestKind::Prepare => "PREPARE",
100            CqlRequestKind::Execute => "EXECUTE",
101            CqlRequestKind::Batch => "BATCH",
102            CqlRequestKind::Register => "REGISTER",
103        };
104
105        f.write_str(kind_str)
106    }
107}
108
109/// Opcode of a request, used to identify the request type in a CQL frame.
110#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord)]
111#[repr(u8)]
112pub enum RequestOpcode {
113    /// See [CqlRequestKind::Startup].
114    Startup = 0x01,
115    /// See [CqlRequestKind::Options].
116    Options = 0x05,
117    /// See [CqlRequestKind::Query].
118    Query = 0x07,
119    /// See [CqlRequestKind::Prepare].
120    Prepare = 0x09,
121    /// See [CqlRequestKind::Execute].
122    Execute = 0x0A,
123    /// See [CqlRequestKind::Register].
124    Register = 0x0B,
125    /// See [CqlRequestKind::Batch].
126    Batch = 0x0D,
127    /// See [CqlRequestKind::AuthResponse].
128    AuthResponse = 0x0F,
129}
130
131impl TryFrom<u8> for RequestOpcode {
132    type Error = TryFromPrimitiveError<u8>;
133
134    fn try_from(value: u8) -> Result<Self, Self::Error> {
135        match value {
136            0x01 => Ok(Self::Startup),
137            0x05 => Ok(Self::Options),
138            0x07 => Ok(Self::Query),
139            0x09 => Ok(Self::Prepare),
140            0x0A => Ok(Self::Execute),
141            0x0B => Ok(Self::Register),
142            0x0D => Ok(Self::Batch),
143            0x0F => Ok(Self::AuthResponse),
144            _ => Err(TryFromPrimitiveError {
145                enum_name: "RequestOpcode",
146                primitive: value,
147            }),
148        }
149    }
150}
151
152/// Requests that can be serialized into a CQL frame.
153pub trait SerializableRequest {
154    /// Opcode of the request, used to identify the request type in the CQL frame.
155    const OPCODE: RequestOpcode;
156
157    /// Serializes the request into the provided buffer.
158    fn serialize(&self, buf: &mut Vec<u8>) -> Result<(), CqlRequestSerializationError>;
159
160    /// Serializes the request into a heap-allocated `Bytes` object.
161    fn to_bytes(&self) -> Result<Bytes, CqlRequestSerializationError> {
162        let mut v = Vec::new();
163        self.serialize(&mut v)?;
164        Ok(v.into())
165    }
166}
167
168/// Requests that can be deserialized from a CQL frame.
169///
170/// Not intended for driver's direct usage (as driver has no interest in deserialising CQL requests),
171/// but very useful for testing (e.g. asserting that the sent requests have proper parameters set).
172pub trait DeserializableRequest: SerializableRequest + Sized {
173    /// Deserializes the request from the provided buffer.
174    /// Use [DeserializableRequest::deserialize_with_features] instead, because some frame types
175    /// require knowing protocol features for correct deserialization.
176    #[deprecated(since = "1.4.0", note = "Use deserialize_with_features instead")]
177    fn deserialize(buf: &mut &[u8]) -> Result<Self, RequestDeserializationError>;
178
179    fn deserialize_with_features(
180        buf: &mut &[u8],
181        #[allow(unused_variables)] features: &ProtocolFeatures,
182    ) -> Result<Self, RequestDeserializationError> {
183        #[expect(deprecated)]
184        Self::deserialize(buf)
185    }
186}
187
188/// An error type returned by [`DeserializableRequest::deserialize`].
189/// This is not intended for driver's direct usage. It's a testing utility,
190/// mainly used by `scylla-proxy` crate.
191#[doc(hidden)]
192#[derive(Debug, Error)]
193pub enum RequestDeserializationError {
194    #[error("Low level deser error: {0}")]
195    LowLevelDeserialization(#[from] LowLevelDeserializationError),
196    #[error("Io error: {0}")]
197    IoError(#[from] std::io::Error),
198    #[error("Specified flags are not recognised: {:02x}", flags)]
199    UnknownFlags { flags: u8 },
200    #[error("Named values in frame are currently unsupported")]
201    NamedValuesUnsupported,
202    #[error("Expected SerialConsistency, got regular Consistency: {0}")]
203    ExpectedSerialConsistency(Consistency),
204    #[error(transparent)]
205    BatchTypeParse(#[from] BatchTypeParseError),
206    #[error("Unexpected batch statement kind: {0}")]
207    UnexpectedBatchStatementKind(u8),
208}
209
210/// A CQL request that can be sent to the server.
211#[non_exhaustive] // TODO: add remaining request types
212#[deprecated(
213    since = "1.4.0",
214    note = "Does not support Scylla metadata id extension. Use RequestV2 instead."
215)]
216pub enum Request<'r> {
217    /// QUERY request, used to execute a single unprepared statement.
218    Query(Query<'r>),
219    /// EXECUTE request, used to execute a single prepared statement.
220    #[expect(deprecated)]
221    Execute(Execute<'r>),
222    /// BATCH request, used to execute a batch of (prepared, unprepared, or mix of both)
223    /// statements.
224    Batch(Batch<'r, BatchStatement<'r>, Vec<SerializedValues>>),
225}
226
227#[expect(deprecated)]
228impl Request<'_> {
229    /// Deserializes the request from the provided buffer.
230    pub fn deserialize(
231        buf: &mut &[u8],
232        opcode: RequestOpcode,
233    ) -> Result<Self, RequestDeserializationError> {
234        match opcode {
235            RequestOpcode::Query => Query::deserialize(buf).map(Self::Query),
236            RequestOpcode::Execute => Execute::deserialize(buf).map(Self::Execute),
237            RequestOpcode::Batch => Batch::deserialize(buf).map(Self::Batch),
238            _ => unimplemented!(
239                "Deserialization of opcode {:?} is not yet supported",
240                opcode
241            ),
242        }
243    }
244
245    /// Retrieves consistency from request frame, if present.
246    pub fn get_consistency(&self) -> Option<Consistency> {
247        match self {
248            Request::Query(q) => Some(q.parameters.consistency),
249            Request::Execute(e) => Some(e.parameters.consistency),
250            Request::Batch(b) => Some(b.consistency),
251            #[expect(unreachable_patterns)] // until other opcodes are supported
252            _ => None,
253        }
254    }
255
256    /// Retrieves serial consistency from request frame.
257    pub fn get_serial_consistency(&self) -> Option<Option<SerialConsistency>> {
258        match self {
259            Request::Query(q) => Some(q.parameters.serial_consistency),
260            Request::Execute(e) => Some(e.parameters.serial_consistency),
261            Request::Batch(b) => Some(b.serial_consistency),
262            #[expect(unreachable_patterns)] // until other opcodes are supported
263            _ => None,
264        }
265    }
266}
267
268/// A CQL request that can be sent to the server.
269#[non_exhaustive] // TODO: add remaining request types
270pub enum RequestV2<'r> {
271    /// QUERY request, used to execute a single unprepared statement.
272    Query(Query<'r>),
273    /// EXECUTE request, used to execute a single prepared statement.
274    Execute(ExecuteV2<'r>),
275    /// BATCH request, used to execute a batch of (prepared, unprepared, or mix of both)
276    /// statements.
277    Batch(Batch<'r, BatchStatement<'r>, Vec<SerializedValues>>),
278}
279
280impl RequestV2<'_> {
281    /// Deserializes the request from the provided buffer.
282    pub fn deserialize(
283        buf: &mut &[u8],
284        opcode: RequestOpcode,
285        features: &ProtocolFeatures,
286    ) -> Result<Self, RequestDeserializationError> {
287        match opcode {
288            RequestOpcode::Query => {
289                Query::deserialize_with_features(buf, features).map(Self::Query)
290            }
291            RequestOpcode::Execute => {
292                ExecuteV2::deserialize_with_features(buf, features).map(Self::Execute)
293            }
294            RequestOpcode::Batch => {
295                Batch::deserialize_with_features(buf, features).map(Self::Batch)
296            }
297            _ => unimplemented!(
298                "Deserialization of opcode {:?} is not yet supported",
299                opcode
300            ),
301        }
302    }
303
304    /// Retrieves consistency from request frame, if present.
305    pub fn get_consistency(&self) -> Option<Consistency> {
306        match self {
307            Self::Query(q) => Some(q.parameters.consistency),
308            Self::Execute(e) => Some(e.parameters.consistency),
309            Self::Batch(b) => Some(b.consistency),
310            #[expect(unreachable_patterns)] // until other opcodes are supported
311            _ => None,
312        }
313    }
314
315    /// Retrieves serial consistency from request frame.
316    pub fn get_serial_consistency(&self) -> Option<Option<SerialConsistency>> {
317        match self {
318            Self::Query(q) => Some(q.parameters.serial_consistency),
319            Self::Execute(e) => Some(e.parameters.serial_consistency),
320            Self::Batch(b) => Some(b.serial_consistency),
321            #[expect(unreachable_patterns)] // until other opcodes are supported
322            _ => None,
323        }
324    }
325}
326
327#[cfg(test)]
328mod tests {
329    use std::{borrow::Cow, ops::Deref};
330
331    use bytes::Bytes;
332
333    use super::query::PagingState;
334    use crate::Consistency;
335    use crate::frame::protocol_features::ProtocolFeatures;
336    use crate::frame::request::batch::{Batch, BatchStatement, BatchType};
337    #[expect(deprecated)]
338    use crate::frame::request::execute::Execute;
339    use crate::frame::request::execute::ExecuteV2;
340    use crate::frame::request::query::{Query, QueryParameters};
341    use crate::frame::request::{DeserializableRequest, SerializableRequest};
342    use crate::frame::response::result::{ColumnType, NativeType};
343    use crate::frame::types::{self, SerialConsistency};
344    use crate::serialize::row::SerializedValues;
345
346    #[test]
347    fn request_ser_de_identity() {
348        // Query
349        let contents = Cow::Borrowed("SELECT host_id from system.peers");
350        let parameters = QueryParameters {
351            consistency: Consistency::All,
352            serial_consistency: Some(SerialConsistency::Serial),
353            timestamp: None,
354            page_size: Some(323),
355            paging_state: PagingState::new_from_raw_bytes(&[2_u8, 1, 3, 7] as &[u8]),
356            skip_metadata: false,
357            values: {
358                let mut vals = SerializedValues::new();
359                vals.add_value(&2137, &ColumnType::Native(NativeType::Int))
360                    .unwrap();
361                Cow::Owned(vals)
362            },
363        };
364        let query = Query {
365            contents,
366            parameters,
367        };
368
369        {
370            let mut buf = Vec::new();
371            query.serialize(&mut buf).unwrap();
372
373            let query_deserialized =
374                Query::deserialize_with_features(&mut &buf[..], &Default::default()).unwrap();
375            assert_eq!(&query_deserialized, &query);
376        }
377
378        // Legacy Execute
379        let id: Bytes = vec![2, 4, 5, 2, 6, 7, 3, 1].into();
380        let parameters = QueryParameters {
381            consistency: Consistency::Any,
382            serial_consistency: None,
383            timestamp: Some(3423434),
384            page_size: None,
385            paging_state: PagingState::start(),
386            skip_metadata: false,
387            values: {
388                let mut vals = SerializedValues::new();
389                vals.add_value(&42, &ColumnType::Native(NativeType::Int))
390                    .unwrap();
391                vals.add_value(&2137, &ColumnType::Native(NativeType::Int))
392                    .unwrap();
393                Cow::Owned(vals)
394            },
395        };
396
397        #[expect(deprecated)]
398        let execute = Execute {
399            id,
400            parameters: parameters.clone(),
401        };
402        {
403            let mut buf = Vec::new();
404            execute.serialize(&mut buf).unwrap();
405
406            #[expect(deprecated)]
407            let execute_deserialized =
408                Execute::deserialize_with_features(&mut &buf[..], &Default::default()).unwrap();
409            assert_eq!(&execute_deserialized, &execute);
410        }
411
412        // New Execute
413        let id = [2, 4, 5, 2, 6, 7, 3, 1].as_slice().into();
414        let result_metadata_id = Some([2, 4, 5, 2, 6, 7, 3, 1].as_slice().into());
415        let execute_with_id = ExecuteV2 {
416            id,
417            result_metadata_id,
418            parameters,
419        };
420        {
421            let mut buf = Vec::new();
422            execute_with_id.serialize(&mut buf).unwrap();
423
424            let features = ProtocolFeatures {
425                scylla_metadata_id_supported: true,
426                ..Default::default()
427            };
428            let execute_deserialized =
429                ExecuteV2::deserialize_with_features(&mut &buf[..], &features).unwrap();
430            assert_eq!(&execute_deserialized, &execute_with_id);
431        }
432
433        // Batch
434        let statements = vec![
435            BatchStatement::Query {
436                text: query.contents,
437            },
438            BatchStatement::Prepared {
439                id: Cow::Borrowed(execute_with_id.id.as_ref()),
440            },
441        ];
442        let batch = Batch {
443            statements: Cow::Owned(statements),
444            batch_type: BatchType::Logged,
445            consistency: Consistency::EachQuorum,
446            serial_consistency: Some(SerialConsistency::LocalSerial),
447            timestamp: Some(32432),
448
449            // Not execute's values, because named values are not supported in batches.
450            values: vec![
451                query.parameters.values.deref().clone(),
452                query.parameters.values.deref().clone(),
453            ],
454        };
455        {
456            let mut buf = Vec::new();
457            batch.serialize(&mut buf).unwrap();
458
459            let batch_deserialized =
460                Batch::deserialize_with_features(&mut &buf[..], &Default::default()).unwrap();
461            assert_eq!(&batch_deserialized, &batch);
462        }
463    }
464
465    #[test]
466    fn deser_rejects_unknown_flags() {
467        // Query
468        let contents = Cow::Borrowed("SELECT host_id from system.peers");
469        let parameters = QueryParameters {
470            consistency: Default::default(),
471            serial_consistency: Some(SerialConsistency::LocalSerial),
472            timestamp: None,
473            page_size: None,
474            paging_state: PagingState::start(),
475            skip_metadata: false,
476            values: Cow::Borrowed(SerializedValues::EMPTY),
477        };
478        let query = Query {
479            contents: contents.clone(),
480            parameters,
481        };
482
483        {
484            let mut buf = Vec::new();
485            query.serialize(&mut buf).unwrap();
486
487            // Sanity check: query deserializes to the equivalent.
488            let query_deserialized =
489                Query::deserialize_with_features(&mut &buf[..], &Default::default()).unwrap();
490            assert_eq!(&query_deserialized.contents, &query.contents);
491            assert_eq!(&query_deserialized.parameters, &query.parameters);
492
493            // Now modify flags by adding an unknown one.
494            // Find flags in buffer:
495            let mut buf_ptr = buf.as_slice();
496            let serialised_contents = types::read_long_string(&mut buf_ptr).unwrap();
497            assert_eq!(serialised_contents, contents);
498
499            // Now buf_ptr points at consistency.
500            let consistency = types::read_consistency(&mut buf_ptr).unwrap();
501            assert_eq!(consistency, Consistency::default());
502
503            // Now buf_ptr points at flags, but it is immutable. Get mutable reference into the buffer.
504            let flags_idx = buf.len() - buf_ptr.len();
505            let flags_mut = &mut buf[flags_idx];
506
507            // This assumes that the following flag is unknown, which is true at the time of writing this test.
508            *flags_mut |= 0x80;
509
510            // Unknown flag should lead to frame rejection, as unknown flags can be new protocol extensions
511            // leading to different semantics.
512            let _parse_error =
513                Query::deserialize_with_features(&mut &buf[..], &Default::default()).unwrap_err();
514        }
515
516        // Batch
517        let statements = vec![BatchStatement::Query {
518            text: query.contents,
519        }];
520        let batch = Batch {
521            statements: Cow::Owned(statements),
522            batch_type: BatchType::Logged,
523            consistency: Consistency::EachQuorum,
524            serial_consistency: None,
525            timestamp: None,
526
527            values: vec![query.parameters.values.deref().clone()],
528        };
529        {
530            let mut buf = Vec::new();
531            batch.serialize(&mut buf).unwrap();
532
533            // Sanity check: batch deserializes to the equivalent.
534            let batch_deserialized =
535                Batch::deserialize_with_features(&mut &buf[..], &Default::default()).unwrap();
536            assert_eq!(batch, batch_deserialized);
537
538            // Now modify flags by adding an unknown one.
539            // There are no timestamp nor serial consistency, so flags are the last byte in the buf.
540            let buf_len = buf.len();
541            let flags_mut = &mut buf[buf_len - 1];
542            // This assumes that the following flag is unknown, which is true at the time of writing this test.
543            *flags_mut |= 0x80;
544
545            // Unknown flag should lead to frame rejection, as unknown flags can be new protocol extensions
546            // leading to different semantics.
547            let _parse_error =
548                Batch::deserialize_with_features(&mut &buf[..], &Default::default()).unwrap_err();
549        }
550    }
551}