reifydb_core/interface/
cdc.rs1use reifydb_type::internal_error;
5use serde::{Deserialize, Serialize};
6
7use super::{CdcConsumerKeyRange, EncodableKey, QueryTransaction};
8use crate::{CommitVersion, EncodedKey, key::CdcConsumerKey, value::encoded::EncodedValues};
9
10#[repr(transparent)]
11#[derive(Debug, Clone, PartialOrd, PartialEq, Ord, Eq, Hash)]
12pub struct CdcConsumerId(pub(crate) String);
13
14impl CdcConsumerId {
15 pub fn new(id: impl Into<String>) -> Self {
16 let id = id.into();
17 assert_ne!(id, "__FLOW_CONSUMER");
18 Self(id)
19 }
20
21 pub fn flow_consumer() -> Self {
22 Self("__FLOW_CONSUMER".to_string())
23 }
24}
25
26impl AsRef<str> for CdcConsumerId {
27 fn as_ref(&self) -> &str {
28 &self.0
29 }
30}
31
32#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
33pub enum CdcChange {
34 Insert {
35 key: EncodedKey,
36 post: EncodedValues,
37 },
38 Update {
39 key: EncodedKey,
40 pre: EncodedValues,
41 post: EncodedValues,
42 },
43 Delete {
44 key: EncodedKey,
45 pre: Option<EncodedValues>,
46 },
47}
48
49#[derive(Debug, Clone, PartialEq)]
51pub struct Cdc {
52 pub version: CommitVersion,
53 pub timestamp: u64,
54 pub changes: Vec<CdcSequencedChange>,
55}
56
57impl Cdc {
58 pub fn new(version: CommitVersion, timestamp: u64, changes: Vec<CdcSequencedChange>) -> Self {
59 Self {
60 version,
61 timestamp,
62 changes,
63 }
64 }
65}
66
67#[derive(Debug, Clone, PartialEq)]
69pub struct CdcSequencedChange {
70 pub sequence: u16,
71 pub change: CdcChange,
72}
73
74impl CdcSequencedChange {
75 pub fn key(&self) -> &EncodedKey {
76 match &self.change {
77 CdcChange::Insert {
78 key,
79 ..
80 } => key,
81 CdcChange::Update {
82 key,
83 ..
84 } => key,
85 CdcChange::Delete {
86 key,
87 ..
88 } => key,
89 }
90 }
91}
92
93#[derive(Debug, Clone, PartialEq, Eq)]
95pub struct ConsumerState {
96 pub consumer_id: CdcConsumerId,
97 pub checkpoint: CommitVersion,
98}
99
100pub fn get_all_consumer_states<T: QueryTransaction>(txn: &mut T) -> reifydb_type::Result<Vec<ConsumerState>> {
102 let mut states = Vec::new();
103
104 for multi in txn.range(CdcConsumerKeyRange::full_scan())? {
105 let key = CdcConsumerKey::decode(&multi.key)
106 .ok_or_else(|| internal_error!("Unable to decode CdConsumerKey"))?;
107
108 if multi.values.len() >= 8 {
109 let mut buffer = [0u8; 8];
110 buffer.copy_from_slice(&multi.values[0..8]);
111 let checkpoint = CommitVersion(u64::from_be_bytes(buffer));
112
113 states.push(ConsumerState {
114 consumer_id: key.consumer,
115 checkpoint,
116 });
117 }
118 }
119
120 Ok(states)
121}