reifydb_core/interface/
cdc.rs1use reifydb_type::value::datetime::DateTime;
5use serde::{Deserialize, Serialize};
6
7use crate::{
8 common::CommitVersion,
9 encoded::{key::EncodedKey, row::EncodedRow},
10 interface::change::Change,
11};
12
13#[repr(transparent)]
14#[derive(Debug, Clone, PartialOrd, PartialEq, Ord, Eq, Hash)]
15pub struct CdcConsumerId(pub(crate) String);
16
17impl CdcConsumerId {
18 pub fn new(id: impl Into<String>) -> Self {
19 let id = id.into();
20 assert_ne!(id, "__FLOW_CONSUMER");
21 Self(id)
22 }
23
24 pub fn flow_consumer() -> Self {
25 Self("__FLOW_CONSUMER".to_string())
26 }
27}
28
29impl AsRef<str> for CdcConsumerId {
30 fn as_ref(&self) -> &str {
31 &self.0
32 }
33}
34
35#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
38pub enum SystemChange {
39 Insert {
40 key: EncodedKey,
41 post: EncodedRow,
42 },
43 Update {
44 key: EncodedKey,
45 pre: EncodedRow,
46 post: EncodedRow,
47 },
48 Delete {
49 key: EncodedKey,
50 pre: Option<EncodedRow>,
51 },
52}
53
54impl SystemChange {
55 pub fn key(&self) -> &EncodedKey {
57 match self {
58 SystemChange::Insert {
59 key,
60 ..
61 } => key,
62 SystemChange::Update {
63 key,
64 ..
65 } => key,
66 SystemChange::Delete {
67 key,
68 ..
69 } => key,
70 }
71 }
72
73 pub fn value_bytes(&self) -> usize {
75 match self {
76 SystemChange::Insert {
77 post,
78 ..
79 } => post.len(),
80 SystemChange::Update {
81 pre,
82 post,
83 ..
84 } => pre.len() + post.len(),
85 SystemChange::Delete {
86 pre,
87 ..
88 } => pre.as_ref().map(|p| p.len()).unwrap_or(0),
89 }
90 }
91}
92
93#[derive(Debug, Clone)]
95pub struct Cdc {
96 pub version: CommitVersion,
97 pub timestamp: DateTime,
98 pub changes: Vec<Change>,
100 pub system_changes: Vec<SystemChange>,
102}
103
104impl Cdc {
105 pub fn new(
106 version: CommitVersion,
107 timestamp: DateTime,
108 changes: Vec<Change>,
109 system_changes: Vec<SystemChange>,
110 ) -> Self {
111 Self {
112 version,
113 timestamp,
114 changes,
115 system_changes,
116 }
117 }
118}
119
120#[derive(Debug, Clone, PartialEq, Eq)]
122pub struct ConsumerState {
123 pub consumer_id: CdcConsumerId,
124 pub checkpoint: CommitVersion,
125}
126
127#[derive(Debug, Clone)]
129pub struct CdcBatch {
130 pub items: Vec<Cdc>,
132 pub has_more: bool,
134}
135
136impl CdcBatch {
137 pub fn empty() -> Self {
139 Self {
140 items: Vec::new(),
141 has_more: false,
142 }
143 }
144
145 pub fn is_empty(&self) -> bool {
147 self.items.is_empty()
148 }
149}