1pub mod auth_response;
4pub mod batch;
5pub mod execute;
6pub mod options;
7pub mod prepare;
8pub mod query;
9pub mod register;
10pub mod startup;
11
12use batch::BatchTypeParseError;
13use thiserror::Error;
14
15use crate::Consistency;
16use crate::frame::protocol_features::ProtocolFeatures;
17use crate::frame::request::execute::ExecuteV2;
18use crate::serialize::row::SerializedValues;
19use bytes::Bytes;
20
21pub use auth_response::AuthResponse;
22pub use batch::Batch;
23#[expect(deprecated)]
24pub use execute::Execute;
25pub use options::Options;
26pub use prepare::Prepare;
27pub use query::Query;
28pub use startup::Startup;
29
30use self::batch::BatchStatement;
31
32use super::TryFromPrimitiveError;
33use super::frame_errors::{CqlRequestSerializationError, LowLevelDeserializationError};
34use super::types::SerialConsistency;
35
36#[derive(Debug, Copy, Clone)]
40#[non_exhaustive]
41pub enum CqlRequestKind {
42 Startup,
51
52 AuthResponse,
64
65 Options,
68
69 Query,
73
74 Prepare,
77
78 Execute,
81
82 Batch,
86
87 Register,
90}
91
92impl std::fmt::Display for CqlRequestKind {
93 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
94 let kind_str = match self {
95 CqlRequestKind::Startup => "STARTUP",
96 CqlRequestKind::AuthResponse => "AUTH_RESPONSE",
97 CqlRequestKind::Options => "OPTIONS",
98 CqlRequestKind::Query => "QUERY",
99 CqlRequestKind::Prepare => "PREPARE",
100 CqlRequestKind::Execute => "EXECUTE",
101 CqlRequestKind::Batch => "BATCH",
102 CqlRequestKind::Register => "REGISTER",
103 };
104
105 f.write_str(kind_str)
106 }
107}
108
109#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord)]
111#[repr(u8)]
112pub enum RequestOpcode {
113 Startup = 0x01,
115 Options = 0x05,
117 Query = 0x07,
119 Prepare = 0x09,
121 Execute = 0x0A,
123 Register = 0x0B,
125 Batch = 0x0D,
127 AuthResponse = 0x0F,
129}
130
131impl TryFrom<u8> for RequestOpcode {
132 type Error = TryFromPrimitiveError<u8>;
133
134 fn try_from(value: u8) -> Result<Self, Self::Error> {
135 match value {
136 0x01 => Ok(Self::Startup),
137 0x05 => Ok(Self::Options),
138 0x07 => Ok(Self::Query),
139 0x09 => Ok(Self::Prepare),
140 0x0A => Ok(Self::Execute),
141 0x0B => Ok(Self::Register),
142 0x0D => Ok(Self::Batch),
143 0x0F => Ok(Self::AuthResponse),
144 _ => Err(TryFromPrimitiveError {
145 enum_name: "RequestOpcode",
146 primitive: value,
147 }),
148 }
149 }
150}
151
152pub trait SerializableRequest {
154 const OPCODE: RequestOpcode;
156
157 fn serialize(&self, buf: &mut Vec<u8>) -> Result<(), CqlRequestSerializationError>;
159
160 fn to_bytes(&self) -> Result<Bytes, CqlRequestSerializationError> {
162 let mut v = Vec::new();
163 self.serialize(&mut v)?;
164 Ok(v.into())
165 }
166}
167
168pub trait DeserializableRequest: SerializableRequest + Sized {
173 #[deprecated(since = "1.4.0", note = "Use deserialize_with_features instead")]
177 fn deserialize(buf: &mut &[u8]) -> Result<Self, RequestDeserializationError>;
178
179 fn deserialize_with_features(
180 buf: &mut &[u8],
181 #[allow(unused_variables)] features: &ProtocolFeatures,
182 ) -> Result<Self, RequestDeserializationError> {
183 #[expect(deprecated)]
184 Self::deserialize(buf)
185 }
186}
187
188#[doc(hidden)]
192#[derive(Debug, Error)]
193pub enum RequestDeserializationError {
194 #[error("Low level deser error: {0}")]
195 LowLevelDeserialization(#[from] LowLevelDeserializationError),
196 #[error("Io error: {0}")]
197 IoError(#[from] std::io::Error),
198 #[error("Specified flags are not recognised: {:02x}", flags)]
199 UnknownFlags { flags: u8 },
200 #[error("Named values in frame are currently unsupported")]
201 NamedValuesUnsupported,
202 #[error("Expected SerialConsistency, got regular Consistency: {0}")]
203 ExpectedSerialConsistency(Consistency),
204 #[error(transparent)]
205 BatchTypeParse(#[from] BatchTypeParseError),
206 #[error("Unexpected batch statement kind: {0}")]
207 UnexpectedBatchStatementKind(u8),
208}
209
210#[non_exhaustive] #[deprecated(
213 since = "1.4.0",
214 note = "Does not support Scylla metadata id extension. Use RequestV2 instead."
215)]
216pub enum Request<'r> {
217 Query(Query<'r>),
219 #[expect(deprecated)]
221 Execute(Execute<'r>),
222 Batch(Batch<'r, BatchStatement<'r>, Vec<SerializedValues>>),
225}
226
227#[expect(deprecated)]
228impl Request<'_> {
229 pub fn deserialize(
231 buf: &mut &[u8],
232 opcode: RequestOpcode,
233 ) -> Result<Self, RequestDeserializationError> {
234 match opcode {
235 RequestOpcode::Query => Query::deserialize(buf).map(Self::Query),
236 RequestOpcode::Execute => Execute::deserialize(buf).map(Self::Execute),
237 RequestOpcode::Batch => Batch::deserialize(buf).map(Self::Batch),
238 _ => unimplemented!(
239 "Deserialization of opcode {:?} is not yet supported",
240 opcode
241 ),
242 }
243 }
244
245 pub fn get_consistency(&self) -> Option<Consistency> {
247 match self {
248 Request::Query(q) => Some(q.parameters.consistency),
249 Request::Execute(e) => Some(e.parameters.consistency),
250 Request::Batch(b) => Some(b.consistency),
251 #[expect(unreachable_patterns)] _ => None,
253 }
254 }
255
256 pub fn get_serial_consistency(&self) -> Option<Option<SerialConsistency>> {
258 match self {
259 Request::Query(q) => Some(q.parameters.serial_consistency),
260 Request::Execute(e) => Some(e.parameters.serial_consistency),
261 Request::Batch(b) => Some(b.serial_consistency),
262 #[expect(unreachable_patterns)] _ => None,
264 }
265 }
266}
267
268#[non_exhaustive] pub enum RequestV2<'r> {
271 Query(Query<'r>),
273 Execute(ExecuteV2<'r>),
275 Batch(Batch<'r, BatchStatement<'r>, Vec<SerializedValues>>),
278}
279
280impl RequestV2<'_> {
281 pub fn deserialize(
283 buf: &mut &[u8],
284 opcode: RequestOpcode,
285 features: &ProtocolFeatures,
286 ) -> Result<Self, RequestDeserializationError> {
287 match opcode {
288 RequestOpcode::Query => {
289 Query::deserialize_with_features(buf, features).map(Self::Query)
290 }
291 RequestOpcode::Execute => {
292 ExecuteV2::deserialize_with_features(buf, features).map(Self::Execute)
293 }
294 RequestOpcode::Batch => {
295 Batch::deserialize_with_features(buf, features).map(Self::Batch)
296 }
297 _ => unimplemented!(
298 "Deserialization of opcode {:?} is not yet supported",
299 opcode
300 ),
301 }
302 }
303
304 pub fn get_consistency(&self) -> Option<Consistency> {
306 match self {
307 Self::Query(q) => Some(q.parameters.consistency),
308 Self::Execute(e) => Some(e.parameters.consistency),
309 Self::Batch(b) => Some(b.consistency),
310 #[expect(unreachable_patterns)] _ => None,
312 }
313 }
314
315 pub fn get_serial_consistency(&self) -> Option<Option<SerialConsistency>> {
317 match self {
318 Self::Query(q) => Some(q.parameters.serial_consistency),
319 Self::Execute(e) => Some(e.parameters.serial_consistency),
320 Self::Batch(b) => Some(b.serial_consistency),
321 #[expect(unreachable_patterns)] _ => None,
323 }
324 }
325}
326
327#[cfg(test)]
328mod tests {
329 use std::{borrow::Cow, ops::Deref};
330
331 use bytes::Bytes;
332
333 use super::query::PagingState;
334 use crate::Consistency;
335 use crate::frame::protocol_features::ProtocolFeatures;
336 use crate::frame::request::batch::{Batch, BatchStatement, BatchType};
337 #[expect(deprecated)]
338 use crate::frame::request::execute::Execute;
339 use crate::frame::request::execute::ExecuteV2;
340 use crate::frame::request::query::{Query, QueryParameters};
341 use crate::frame::request::{DeserializableRequest, SerializableRequest};
342 use crate::frame::response::result::{ColumnType, NativeType};
343 use crate::frame::types::{self, SerialConsistency};
344 use crate::serialize::row::SerializedValues;
345
346 #[test]
347 fn request_ser_de_identity() {
348 let contents = Cow::Borrowed("SELECT host_id from system.peers");
350 let parameters = QueryParameters {
351 consistency: Consistency::All,
352 serial_consistency: Some(SerialConsistency::Serial),
353 timestamp: None,
354 page_size: Some(323),
355 paging_state: PagingState::new_from_raw_bytes(&[2_u8, 1, 3, 7] as &[u8]),
356 skip_metadata: false,
357 values: {
358 let mut vals = SerializedValues::new();
359 vals.add_value(&2137, &ColumnType::Native(NativeType::Int))
360 .unwrap();
361 Cow::Owned(vals)
362 },
363 };
364 let query = Query {
365 contents,
366 parameters,
367 };
368
369 {
370 let mut buf = Vec::new();
371 query.serialize(&mut buf).unwrap();
372
373 let query_deserialized =
374 Query::deserialize_with_features(&mut &buf[..], &Default::default()).unwrap();
375 assert_eq!(&query_deserialized, &query);
376 }
377
378 let id: Bytes = vec![2, 4, 5, 2, 6, 7, 3, 1].into();
380 let parameters = QueryParameters {
381 consistency: Consistency::Any,
382 serial_consistency: None,
383 timestamp: Some(3423434),
384 page_size: None,
385 paging_state: PagingState::start(),
386 skip_metadata: false,
387 values: {
388 let mut vals = SerializedValues::new();
389 vals.add_value(&42, &ColumnType::Native(NativeType::Int))
390 .unwrap();
391 vals.add_value(&2137, &ColumnType::Native(NativeType::Int))
392 .unwrap();
393 Cow::Owned(vals)
394 },
395 };
396
397 #[expect(deprecated)]
398 let execute = Execute {
399 id,
400 parameters: parameters.clone(),
401 };
402 {
403 let mut buf = Vec::new();
404 execute.serialize(&mut buf).unwrap();
405
406 #[expect(deprecated)]
407 let execute_deserialized =
408 Execute::deserialize_with_features(&mut &buf[..], &Default::default()).unwrap();
409 assert_eq!(&execute_deserialized, &execute);
410 }
411
412 let id = [2, 4, 5, 2, 6, 7, 3, 1].as_slice().into();
414 let result_metadata_id = Some([2, 4, 5, 2, 6, 7, 3, 1].as_slice().into());
415 let execute_with_id = ExecuteV2 {
416 id,
417 result_metadata_id,
418 parameters,
419 };
420 {
421 let mut buf = Vec::new();
422 execute_with_id.serialize(&mut buf).unwrap();
423
424 let features = ProtocolFeatures {
425 scylla_metadata_id_supported: true,
426 ..Default::default()
427 };
428 let execute_deserialized =
429 ExecuteV2::deserialize_with_features(&mut &buf[..], &features).unwrap();
430 assert_eq!(&execute_deserialized, &execute_with_id);
431 }
432
433 let statements = vec![
435 BatchStatement::Query {
436 text: query.contents,
437 },
438 BatchStatement::Prepared {
439 id: Cow::Borrowed(execute_with_id.id.as_ref()),
440 },
441 ];
442 let batch = Batch {
443 statements: Cow::Owned(statements),
444 batch_type: BatchType::Logged,
445 consistency: Consistency::EachQuorum,
446 serial_consistency: Some(SerialConsistency::LocalSerial),
447 timestamp: Some(32432),
448
449 values: vec![
451 query.parameters.values.deref().clone(),
452 query.parameters.values.deref().clone(),
453 ],
454 };
455 {
456 let mut buf = Vec::new();
457 batch.serialize(&mut buf).unwrap();
458
459 let batch_deserialized =
460 Batch::deserialize_with_features(&mut &buf[..], &Default::default()).unwrap();
461 assert_eq!(&batch_deserialized, &batch);
462 }
463 }
464
465 #[test]
466 fn deser_rejects_unknown_flags() {
467 let contents = Cow::Borrowed("SELECT host_id from system.peers");
469 let parameters = QueryParameters {
470 consistency: Default::default(),
471 serial_consistency: Some(SerialConsistency::LocalSerial),
472 timestamp: None,
473 page_size: None,
474 paging_state: PagingState::start(),
475 skip_metadata: false,
476 values: Cow::Borrowed(SerializedValues::EMPTY),
477 };
478 let query = Query {
479 contents: contents.clone(),
480 parameters,
481 };
482
483 {
484 let mut buf = Vec::new();
485 query.serialize(&mut buf).unwrap();
486
487 let query_deserialized =
489 Query::deserialize_with_features(&mut &buf[..], &Default::default()).unwrap();
490 assert_eq!(&query_deserialized.contents, &query.contents);
491 assert_eq!(&query_deserialized.parameters, &query.parameters);
492
493 let mut buf_ptr = buf.as_slice();
496 let serialised_contents = types::read_long_string(&mut buf_ptr).unwrap();
497 assert_eq!(serialised_contents, contents);
498
499 let consistency = types::read_consistency(&mut buf_ptr).unwrap();
501 assert_eq!(consistency, Consistency::default());
502
503 let flags_idx = buf.len() - buf_ptr.len();
505 let flags_mut = &mut buf[flags_idx];
506
507 *flags_mut |= 0x80;
509
510 let _parse_error =
513 Query::deserialize_with_features(&mut &buf[..], &Default::default()).unwrap_err();
514 }
515
516 let statements = vec![BatchStatement::Query {
518 text: query.contents,
519 }];
520 let batch = Batch {
521 statements: Cow::Owned(statements),
522 batch_type: BatchType::Logged,
523 consistency: Consistency::EachQuorum,
524 serial_consistency: None,
525 timestamp: None,
526
527 values: vec![query.parameters.values.deref().clone()],
528 };
529 {
530 let mut buf = Vec::new();
531 batch.serialize(&mut buf).unwrap();
532
533 let batch_deserialized =
535 Batch::deserialize_with_features(&mut &buf[..], &Default::default()).unwrap();
536 assert_eq!(batch, batch_deserialized);
537
538 let buf_len = buf.len();
541 let flags_mut = &mut buf[buf_len - 1];
542 *flags_mut |= 0x80;
544
545 let _parse_error =
548 Batch::deserialize_with_features(&mut &buf[..], &Default::default()).unwrap_err();
549 }
550 }
551}