reifydb_core/interface/
cdc.rs1use reifydb_type::value::datetime::DateTime;
5use serde::{Deserialize, Serialize};
6
7use crate::{
8 common::CommitVersion,
9 encoded::{key::EncodedKey, row::EncodedRow},
10 interface::change::Change,
11};
12
13#[repr(transparent)]
14#[derive(Debug, Clone, PartialOrd, PartialEq, Ord, Eq, Hash)]
15pub struct CdcConsumerId(pub(crate) String);
16
17impl CdcConsumerId {
18 pub fn new(id: impl Into<String>) -> Self {
19 let id = id.into();
20 assert_ne!(id, "__FLOW_CONSUMER");
21 Self(id)
22 }
23
24 pub fn flow_consumer() -> Self {
25 Self("__FLOW_CONSUMER".to_string())
26 }
27}
28
29impl AsRef<str> for CdcConsumerId {
30 fn as_ref(&self) -> &str {
31 &self.0
32 }
33}
34
35#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
36pub enum SystemChange {
37 Insert {
38 key: EncodedKey,
39 post: EncodedRow,
40 },
41 Update {
42 key: EncodedKey,
43 pre: EncodedRow,
44 post: EncodedRow,
45 },
46 Delete {
47 key: EncodedKey,
48 pre: Option<EncodedRow>,
49 },
50}
51
52impl SystemChange {
53 pub fn key(&self) -> &EncodedKey {
54 match self {
55 SystemChange::Insert {
56 key,
57 ..
58 } => key,
59 SystemChange::Update {
60 key,
61 ..
62 } => key,
63 SystemChange::Delete {
64 key,
65 ..
66 } => key,
67 }
68 }
69
70 pub fn value_bytes(&self) -> usize {
71 match self {
72 SystemChange::Insert {
73 post,
74 ..
75 } => post.len(),
76 SystemChange::Update {
77 pre,
78 post,
79 ..
80 } => pre.len() + post.len(),
81 SystemChange::Delete {
82 pre,
83 ..
84 } => pre.as_ref().map(|p| p.len()).unwrap_or(0),
85 }
86 }
87}
88
89#[derive(Debug, Clone, Serialize, Deserialize)]
90pub struct Cdc {
91 pub version: CommitVersion,
92 pub timestamp: DateTime,
93
94 pub changes: Vec<Change>,
95
96 pub system_changes: Vec<SystemChange>,
97}
98
99impl Cdc {
100 pub fn new(
101 version: CommitVersion,
102 timestamp: DateTime,
103 changes: Vec<Change>,
104 system_changes: Vec<SystemChange>,
105 ) -> Self {
106 Self {
107 version,
108 timestamp,
109 changes,
110 system_changes,
111 }
112 }
113}
114
115#[derive(Debug, Clone, PartialEq, Eq)]
116pub struct ConsumerState {
117 pub consumer_id: CdcConsumerId,
118 pub checkpoint: CommitVersion,
119}
120
121#[derive(Debug, Clone)]
122pub struct CdcBatch {
123 pub items: Vec<Cdc>,
124
125 pub has_more: bool,
126}
127
128impl CdcBatch {
129 pub fn empty() -> Self {
130 Self {
131 items: Vec::new(),
132 has_more: false,
133 }
134 }
135
136 pub fn is_empty(&self) -> bool {
137 self.items.is_empty()
138 }
139}