Skip to main content

cassandra_protocol/types/
value.rs

1use std::cmp::Eq;
2use std::collections::{BTreeMap, HashMap};
3use std::convert::Into;
4use std::fmt::Debug;
5use std::hash::Hash;
6use std::net::IpAddr;
7use std::num::{NonZeroI16, NonZeroI32, NonZeroI64, NonZeroI8};
8
9use chrono::prelude::*;
10use num_bigint::BigInt;
11use time::PrimitiveDateTime;
12use uuid::Uuid;
13
14use super::blob::Blob;
15use super::decimal::Decimal;
16use super::duration::Duration;
17use super::*;
18use crate::Error;
19
20const NULL_INT_VALUE: i32 = -1;
21const NOT_SET_INT_VALUE: i32 = -2;
22
23/// Cassandra value which could be an array of bytes, null and non-set values.
24#[derive(Debug, Clone, PartialEq, Ord, PartialOrd, Eq, Hash)]
25pub enum Value {
26    Some(Vec<u8>),
27    Null,
28    NotSet,
29}
30
31impl Value {
32    pub fn new<B>(v: B) -> Value
33    where
34        B: Into<Bytes>,
35    {
36        Value::Some(v.into().0)
37    }
38}
39
40impl Serialize for Value {
41    fn serialize(&self, cursor: &mut Cursor<&mut Vec<u8>>, version: Version) {
42        match self {
43            Value::Null => NULL_INT_VALUE.serialize(cursor, version),
44            Value::NotSet => NOT_SET_INT_VALUE.serialize(cursor, version),
45            Value::Some(value) => {
46                let len = value.len() as CInt;
47                len.serialize(cursor, version);
48                value.serialize(cursor, version);
49            }
50        }
51    }
52}
53
54impl FromCursor for Value {
55    fn from_cursor(cursor: &mut Cursor<&[u8]>, _version: Version) -> Result<Value, Error> {
56        // Per the protocol spec: a [value] is encoded as a [int] n followed by
57        // n bytes when n is non-negative. Special negative encodings stand for
58        // null (-1) and "not set" (-2). A length of zero is therefore valid and
59        // represents an empty value (e.g. an empty BLOB or empty TEXT).
60        let value_size = {
61            let mut buff = [0; INT_LEN];
62            cursor.read_exact(&mut buff)?;
63            CInt::from_be_bytes(buff)
64        };
65
66        if value_size >= 0 {
67            // covers both positive lengths and the empty-value (length 0) case
68            Ok(Value::Some(cursor_next_value(cursor, value_size as usize)?))
69        } else if value_size == NULL_INT_VALUE {
70            Ok(Value::Null)
71        } else if value_size == NOT_SET_INT_VALUE {
72            Ok(Value::NotSet)
73        } else {
74            // any other negative value is not part of the protocol
75            Err(Error::General("Could not decode query values".into()))
76        }
77    }
78}
79
80// We are assuming here primitive value serialization will not change across protocol versions,
81// which gives us simpler user API.
82
83impl<T: Into<Bytes>> From<T> for Value {
84    fn from(b: T) -> Value {
85        Value::new(b.into())
86    }
87}
88
89impl<T: Into<Bytes>> From<Option<T>> for Value {
90    fn from(b: Option<T>) -> Value {
91        match b {
92            Some(b) => Value::new(b.into()),
93            None => Value::Null,
94        }
95    }
96}
97
98#[derive(Debug, Clone, Constructor)]
99pub struct Bytes(Vec<u8>);
100
101impl Bytes {
102    /// Consumes `Bytes` and returns the inner `Vec<u8>`
103    pub fn into_inner(self) -> Vec<u8> {
104        self.0
105    }
106}
107
108impl From<String> for Bytes {
109    #[inline]
110    fn from(value: String) -> Self {
111        Bytes(value.into_bytes())
112    }
113}
114
115impl From<&str> for Bytes {
116    #[inline]
117    fn from(value: &str) -> Self {
118        Bytes(value.as_bytes().to_vec())
119    }
120}
121
122impl From<i8> for Bytes {
123    #[inline]
124    fn from(value: i8) -> Self {
125        Bytes(vec![value as u8])
126    }
127}
128
129impl From<i16> for Bytes {
130    #[inline]
131    fn from(value: i16) -> Self {
132        Bytes(to_short(value))
133    }
134}
135
136impl From<i32> for Bytes {
137    #[inline]
138    fn from(value: i32) -> Self {
139        Bytes(to_int(value))
140    }
141}
142
143impl From<i64> for Bytes {
144    #[inline]
145    fn from(value: i64) -> Self {
146        Bytes(to_bigint(value))
147    }
148}
149
150impl From<u8> for Bytes {
151    #[inline]
152    fn from(value: u8) -> Self {
153        Bytes(vec![value])
154    }
155}
156
157impl From<u16> for Bytes {
158    #[inline]
159    fn from(value: u16) -> Self {
160        Bytes(to_u_short(value))
161    }
162}
163
164impl From<u32> for Bytes {
165    #[inline]
166    fn from(value: u32) -> Self {
167        Bytes(to_u_int(value))
168    }
169}
170
171impl From<u64> for Bytes {
172    #[inline]
173    fn from(value: u64) -> Self {
174        Bytes(to_u_big(value))
175    }
176}
177
178impl From<NonZeroI8> for Bytes {
179    #[inline]
180    fn from(value: NonZeroI8) -> Self {
181        value.get().into()
182    }
183}
184
185impl From<NonZeroI16> for Bytes {
186    #[inline]
187    fn from(value: NonZeroI16) -> Self {
188        value.get().into()
189    }
190}
191
192impl From<NonZeroI32> for Bytes {
193    #[inline]
194    fn from(value: NonZeroI32) -> Self {
195        value.get().into()
196    }
197}
198
199impl From<NonZeroI64> for Bytes {
200    #[inline]
201    fn from(value: NonZeroI64) -> Self {
202        value.get().into()
203    }
204}
205
206impl From<bool> for Bytes {
207    #[inline]
208    fn from(value: bool) -> Self {
209        if value {
210            Bytes(vec![1])
211        } else {
212            Bytes(vec![0])
213        }
214    }
215}
216
217impl From<Uuid> for Bytes {
218    #[inline]
219    fn from(value: Uuid) -> Self {
220        Bytes(value.as_bytes().to_vec())
221    }
222}
223
224impl From<IpAddr> for Bytes {
225    #[inline]
226    fn from(value: IpAddr) -> Self {
227        match value {
228            IpAddr::V4(ip) => Bytes(ip.octets().to_vec()),
229            IpAddr::V6(ip) => Bytes(ip.octets().to_vec()),
230        }
231    }
232}
233
234impl From<f32> for Bytes {
235    #[inline]
236    fn from(value: f32) -> Self {
237        Bytes(to_float(value))
238    }
239}
240
241impl From<f64> for Bytes {
242    #[inline]
243    fn from(value: f64) -> Self {
244        Bytes(to_float_big(value))
245    }
246}
247
248impl From<PrimitiveDateTime> for Bytes {
249    #[inline]
250    fn from(value: PrimitiveDateTime) -> Self {
251        let ts: i64 =
252            value.assume_utc().unix_timestamp() * 1_000 + value.nanosecond() as i64 / 1_000_000;
253        Bytes(to_bigint(ts))
254    }
255}
256
257impl From<Blob> for Bytes {
258    #[inline]
259    fn from(value: Blob) -> Self {
260        Bytes(value.into_vec())
261    }
262}
263
264impl From<Decimal> for Bytes {
265    #[inline]
266    fn from(value: Decimal) -> Self {
267        Bytes(value.serialize_to_vec(Version::V4))
268    }
269}
270
271impl From<NaiveDateTime> for Bytes {
272    #[inline]
273    fn from(value: NaiveDateTime) -> Self {
274        value.and_utc().timestamp_millis().into()
275    }
276}
277
278impl From<DateTime<Utc>> for Bytes {
279    #[inline]
280    fn from(value: DateTime<Utc>) -> Self {
281        value.timestamp_millis().into()
282    }
283}
284
285impl From<Duration> for Bytes {
286    #[inline]
287    fn from(value: Duration) -> Self {
288        Bytes(value.serialize_to_vec(Version::V5))
289    }
290}
291
292impl<T: Into<Bytes>> From<Vec<T>> for Bytes {
293    fn from(vec: Vec<T>) -> Bytes {
294        let mut bytes = Vec::with_capacity(INT_LEN);
295        let len = vec.len() as CInt;
296
297        bytes.extend_from_slice(&len.to_be_bytes());
298
299        let mut cursor = Cursor::new(&mut bytes);
300        cursor.set_position(INT_LEN as u64);
301
302        for v in vec {
303            let b: Bytes = v.into();
304            Value::new(b).serialize(&mut cursor, Version::V4);
305        }
306
307        Bytes(bytes)
308    }
309}
310
311impl From<BigInt> for Bytes {
312    fn from(value: BigInt) -> Self {
313        Self(value.serialize_to_vec(Version::V4))
314    }
315}
316
317impl<K, V> From<HashMap<K, V>> for Bytes
318where
319    K: Into<Bytes> + Hash + Eq,
320    V: Into<Bytes>,
321{
322    fn from(map: HashMap<K, V>) -> Bytes {
323        let mut bytes = Vec::with_capacity(INT_LEN);
324        let len = map.len() as CInt;
325
326        bytes.extend_from_slice(&len.to_be_bytes());
327
328        let mut cursor = Cursor::new(&mut bytes);
329        cursor.set_position(INT_LEN as u64);
330
331        for (k, v) in map {
332            let key_bytes: Bytes = k.into();
333            let val_bytes: Bytes = v.into();
334
335            Value::new(key_bytes).serialize(&mut cursor, Version::V4);
336            Value::new(val_bytes).serialize(&mut cursor, Version::V4);
337        }
338
339        Bytes(bytes)
340    }
341}
342
343impl<K, V> From<BTreeMap<K, V>> for Bytes
344where
345    K: Into<Bytes> + Hash + Eq,
346    V: Into<Bytes>,
347{
348    fn from(map: BTreeMap<K, V>) -> Bytes {
349        let mut bytes = Vec::with_capacity(INT_LEN);
350        let len = map.len() as CInt;
351
352        bytes.extend_from_slice(&len.to_be_bytes());
353
354        let mut cursor = Cursor::new(&mut bytes);
355        cursor.set_position(INT_LEN as u64);
356
357        for (k, v) in map {
358            let key_bytes: Bytes = k.into();
359            let val_bytes: Bytes = v.into();
360
361            Value::new(key_bytes).serialize(&mut cursor, Version::V4);
362            Value::new(val_bytes).serialize(&mut cursor, Version::V4);
363        }
364
365        Bytes(bytes)
366    }
367}
368
369#[cfg(test)]
370mod tests {
371    use super::*;
372
373    #[test]
374    fn test_value_serialization() {
375        assert_eq!(
376            Value::Some(vec![1]).serialize_to_vec(Version::V4),
377            vec![0, 0, 0, 1, 1]
378        );
379
380        assert_eq!(
381            Value::Some(vec![1, 2, 3]).serialize_to_vec(Version::V4),
382            vec![0, 0, 0, 3, 1, 2, 3]
383        );
384
385        assert_eq!(
386            Value::Null.serialize_to_vec(Version::V4),
387            vec![255, 255, 255, 255]
388        );
389        assert_eq!(
390            Value::NotSet.serialize_to_vec(Version::V4),
391            vec![255, 255, 255, 254]
392        )
393    }
394
395    #[test]
396    fn test_value_from_cursor_handles_all_lengths() {
397        // length 0 — a zero-length value is a valid Cassandra value (an empty
398        // string, an empty blob, etc.). It must round-trip as `Value::Some(vec![])`,
399        // not be rejected as malformed.
400        let bytes = vec![0, 0, 0, 0];
401        let mut cursor = Cursor::new(bytes.as_slice());
402        assert_eq!(
403            Value::from_cursor(&mut cursor, Version::V4).unwrap(),
404            Value::Some(vec![])
405        );
406
407        // positive length — value bytes follow the 4-byte length
408        let bytes = vec![0, 0, 0, 3, 1, 2, 3];
409        let mut cursor = Cursor::new(bytes.as_slice());
410        assert_eq!(
411            Value::from_cursor(&mut cursor, Version::V4).unwrap(),
412            Value::Some(vec![1, 2, 3])
413        );
414
415        // -1 (0xFFFFFFFF) means null
416        let bytes = vec![255, 255, 255, 255];
417        let mut cursor = Cursor::new(bytes.as_slice());
418        assert_eq!(
419            Value::from_cursor(&mut cursor, Version::V4).unwrap(),
420            Value::Null
421        );
422
423        // -2 (0xFFFFFFFE) means "not set" (unbound bind variable)
424        let bytes = vec![255, 255, 255, 254];
425        let mut cursor = Cursor::new(bytes.as_slice());
426        assert_eq!(
427            Value::from_cursor(&mut cursor, Version::V4).unwrap(),
428            Value::NotSet
429        );
430
431        // anything else (e.g. -3) is malformed and must error
432        let bytes = vec![255, 255, 255, 253];
433        let mut cursor = Cursor::new(bytes.as_slice());
434        assert!(Value::from_cursor(&mut cursor, Version::V4).is_err());
435    }
436
437    #[test]
438    fn test_new_value_all_types() {
439        assert_eq!(
440            Value::new("hello"),
441            Value::Some(vec!(104, 101, 108, 108, 111))
442        );
443        assert_eq!(
444            Value::new("hello".to_string()),
445            Value::Some(vec!(104, 101, 108, 108, 111))
446        );
447        assert_eq!(Value::new(1_u8), Value::Some(vec!(1)));
448        assert_eq!(Value::new(1_u16), Value::Some(vec!(0, 1)));
449        assert_eq!(Value::new(1_u32), Value::Some(vec!(0, 0, 0, 1)));
450        assert_eq!(Value::new(1_u64), Value::Some(vec!(0, 0, 0, 0, 0, 0, 0, 1)));
451        assert_eq!(Value::new(1_i8), Value::Some(vec!(1)));
452        assert_eq!(Value::new(1_i16), Value::Some(vec!(0, 1)));
453        assert_eq!(Value::new(1_i32), Value::Some(vec!(0, 0, 0, 1)));
454        assert_eq!(Value::new(1_i64), Value::Some(vec!(0, 0, 0, 0, 0, 0, 0, 1)));
455        assert_eq!(Value::new(true), Value::Some(vec!(1)));
456        assert_eq!(
457            Value::new(Duration::new(100, 200, 300).unwrap()),
458            Value::Some(vec!(200, 1, 144, 3, 216, 4))
459        );
460    }
461}