scylla_cdc/
cdc_types.rs

1//! A module containing types related to CDC internal structure.
2use scylla::deserialize::value::DeserializeValue;
3use scylla::frame::response::result::ColumnType;
4use scylla::serialize::SerializationError;
5use scylla::serialize::value::SerializeValue;
6use scylla::serialize::writers::{CellWriter, WrittenCellProof};
7use scylla::value::CqlTimestamp;
8use std::fmt;
9
10/// A struct representing a timestamp of a stream generation.
11#[derive(Debug, Clone, Eq, PartialEq, Hash, Ord, PartialOrd)]
12pub struct GenerationTimestamp {
13    pub(crate) timestamp: chrono::Duration,
14}
15
16impl fmt::Display for GenerationTimestamp {
17    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
18        write!(f, "{}", self.timestamp.num_milliseconds())
19    }
20}
21
22impl SerializeValue for GenerationTimestamp {
23    fn serialize<'b>(
24        &self,
25        typ: &ColumnType,
26        writer: CellWriter<'b>,
27    ) -> Result<WrittenCellProof<'b>, SerializationError> {
28        CqlTimestamp(self.timestamp.num_milliseconds()).serialize(typ, writer)
29    }
30}
31
32impl<'frame, 'metadata> DeserializeValue<'frame, 'metadata> for GenerationTimestamp {
33    fn type_check(typ: &ColumnType) -> Result<(), scylla::deserialize::TypeCheckError> {
34        <CqlTimestamp as DeserializeValue<'frame, 'metadata>>::type_check(typ)
35    }
36
37    fn deserialize(
38        typ: &'metadata ColumnType<'metadata>,
39        v: Option<scylla::deserialize::FrameSlice<'frame>>,
40    ) -> Result<Self, scylla::deserialize::DeserializationError> {
41        let timestamp = <CqlTimestamp as DeserializeValue<'frame, 'metadata>>::deserialize(typ, v)?;
42
43        Ok(GenerationTimestamp {
44            timestamp: chrono::Duration::milliseconds(timestamp.0),
45        })
46    }
47}
48
49/// A struct representing a stream ID.
50#[derive(Debug, Clone, Eq, PartialEq, Hash, Ord, PartialOrd)]
51pub struct StreamID {
52    pub(crate) id: Vec<u8>,
53}
54
55impl fmt::Display for StreamID {
56    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
57        let encoded_stream_id = hex::encode(self.id.clone());
58        write!(f, "{encoded_stream_id}")
59    }
60}
61
62impl SerializeValue for StreamID {
63    fn serialize<'b>(
64        &self,
65        typ: &ColumnType,
66        writer: CellWriter<'b>,
67    ) -> Result<WrittenCellProof<'b>, SerializationError> {
68        self.id.serialize(typ, writer)
69    }
70}
71
72impl<'frame, 'metadata> DeserializeValue<'frame, 'metadata> for StreamID {
73    fn type_check(typ: &ColumnType) -> Result<(), scylla::deserialize::TypeCheckError> {
74        <Vec<u8> as DeserializeValue<'frame, 'metadata>>::type_check(typ)
75    }
76
77    fn deserialize(
78        typ: &'metadata ColumnType<'metadata>,
79        v: Option<scylla::deserialize::FrameSlice<'frame>>,
80    ) -> Result<Self, scylla::deserialize::DeserializationError> {
81        let id = <Vec<u8> as DeserializeValue<'frame, 'metadata>>::deserialize(typ, v)?;
82        Ok(StreamID { id })
83    }
84}
85
86impl StreamID {
87    pub fn new(stream_id: Vec<u8>) -> StreamID {
88        StreamID { id: stream_id }
89    }
90}