1#[derive(Debug, Clone, PartialEq, Eq)]
10pub enum CdcOp {
11 Set,
13 Del,
15 Expire,
17 HSet,
19 LPush,
21 RPush,
23 SAdd,
25 FlushDb,
27}
28
29#[derive(Debug, Clone)]
35pub struct CdcEvent {
36 pub seq: u64,
38 pub timestamp_ms: u64,
40 pub op: CdcOp,
42 pub key: Vec<u8>,
44 pub value: Option<Vec<u8>>,
46}
47
48pub struct CdcRing {
57 buffer: Vec<Option<CdcEvent>>,
58 capacity: usize,
59 write_seq: u64,
60 start_seq: u64,
61}
62
63impl CdcRing {
64 pub fn new(capacity: usize) -> Self {
70 assert!(capacity > 0, "CDC ring capacity must be > 0");
71 let mut buffer = Vec::with_capacity(capacity);
72 for _ in 0..capacity {
73 buffer.push(None);
74 }
75 Self {
76 buffer,
77 capacity,
78 write_seq: 0,
79 start_seq: 0,
80 }
81 }
82
83 pub fn push(&mut self, op: CdcOp, key: Vec<u8>, value: Option<Vec<u8>>, timestamp_ms: u64) {
88 let seq = self.write_seq;
89 let idx = (seq as usize) % self.capacity;
90
91 self.buffer[idx] = Some(CdcEvent {
92 seq,
93 timestamp_ms,
94 op,
95 key,
96 value,
97 });
98
99 self.write_seq += 1;
100
101 if self.write_seq > self.capacity as u64 {
102 self.start_seq = self.write_seq - self.capacity as u64;
103 }
104 }
105
106 pub fn write_seq(&self) -> u64 {
108 self.write_seq
109 }
110
111 pub fn start_seq(&self) -> u64 {
113 self.start_seq
114 }
115
116 pub fn len(&self) -> usize {
118 (self.write_seq - self.start_seq) as usize
119 }
120
121 pub fn is_empty(&self) -> bool {
123 self.write_seq == 0
124 }
125
126 pub fn read(&self, from_seq: u64, limit: usize) -> CdcReadResult {
132 if from_seq >= self.write_seq {
133 return CdcReadResult {
134 events: vec![],
135 next_seq: self.write_seq,
136 gap: false,
137 };
138 }
139
140 let actual_start = if from_seq < self.start_seq {
141 self.start_seq
142 } else {
143 from_seq
144 };
145
146 let gap = from_seq < self.start_seq;
147 let available = (self.write_seq - actual_start) as usize;
148 let count = available.min(limit);
149
150 let mut events = Vec::with_capacity(count);
151 for i in 0..count {
152 let seq = actual_start + i as u64;
153 let idx = (seq as usize) % self.capacity;
154 if let Some(ref event) = self.buffer[idx] {
155 events.push(event.clone());
156 }
157 }
158
159 CdcReadResult {
160 next_seq: actual_start + count as u64,
161 events,
162 gap,
163 }
164 }
165
166 pub fn get(&self, seq: u64) -> Option<&CdcEvent> {
170 if seq < self.start_seq || seq >= self.write_seq {
171 return None;
172 }
173 let idx = (seq as usize) % self.capacity;
174 self.buffer[idx].as_ref()
175 }
176}
177
178#[derive(Debug)]
180pub struct CdcReadResult {
181 pub events: Vec<CdcEvent>,
183 pub next_seq: u64,
185 pub gap: bool,
188}
189
190#[cfg(test)]
191mod tests {
192 use super::*;
193
194 #[test]
195 fn test_push_and_read() {
196 let mut ring = CdcRing::new(10);
197 ring.push(CdcOp::Set, b"k1".to_vec(), Some(b"v1".to_vec()), 100);
198 ring.push(CdcOp::Set, b"k2".to_vec(), Some(b"v2".to_vec()), 200);
199 ring.push(CdcOp::Del, b"k1".to_vec(), None, 300);
200
201 assert_eq!(ring.len(), 3);
202 assert_eq!(ring.write_seq(), 3);
203 assert_eq!(ring.start_seq(), 0);
204
205 let result = ring.read(0, 100);
206 assert_eq!(result.events.len(), 3);
207 assert!(!result.gap);
208 assert_eq!(result.next_seq, 3);
209 assert_eq!(result.events[0].op, CdcOp::Set);
210 assert_eq!(result.events[0].key, b"k1");
211 assert_eq!(result.events[2].op, CdcOp::Del);
212 }
213
214 #[test]
215 fn test_read_with_limit() {
216 let mut ring = CdcRing::new(10);
217 for i in 0..5 {
218 ring.push(CdcOp::Set, format!("k{}", i).into_bytes(), None, i as u64);
219 }
220
221 let result = ring.read(0, 2);
222 assert_eq!(result.events.len(), 2);
223 assert_eq!(result.next_seq, 2);
224
225 let result = ring.read(2, 2);
226 assert_eq!(result.events.len(), 2);
227 assert_eq!(result.next_seq, 4);
228 }
229
230 #[test]
231 fn test_wrapping() {
232 let mut ring = CdcRing::new(4);
233 for i in 0..10 {
234 ring.push(CdcOp::Set, format!("k{}", i).into_bytes(), None, i as u64);
235 }
236
237 assert_eq!(ring.len(), 4);
238 assert_eq!(ring.start_seq(), 6);
239 assert_eq!(ring.write_seq(), 10);
240
241 let result = ring.read(0, 100);
243 assert!(result.gap); assert_eq!(result.events.len(), 4);
245 assert_eq!(result.events[0].seq, 6);
246 assert_eq!(result.events[3].seq, 9);
247 }
248
249 #[test]
250 fn test_gap_detection() {
251 let mut ring = CdcRing::new(4);
252 for i in 0..10 {
253 ring.push(CdcOp::Set, format!("k{}", i).into_bytes(), None, i as u64);
254 }
255
256 let result = ring.read(3, 100);
258 assert!(result.gap);
259 assert_eq!(result.events[0].seq, 6); }
261
262 #[test]
263 fn test_read_at_write_head() {
264 let mut ring = CdcRing::new(10);
265 ring.push(CdcOp::Set, b"k".to_vec(), None, 0);
266
267 let result = ring.read(1, 100);
268 assert!(result.events.is_empty());
269 assert_eq!(result.next_seq, 1);
270 }
271
272 #[test]
273 fn test_get_single() {
274 let mut ring = CdcRing::new(10);
275 ring.push(CdcOp::Set, b"k0".to_vec(), None, 0);
276 ring.push(CdcOp::Del, b"k1".to_vec(), None, 1);
277
278 let event = ring.get(0).unwrap();
279 assert_eq!(event.op, CdcOp::Set);
280
281 let event = ring.get(1).unwrap();
282 assert_eq!(event.op, CdcOp::Del);
283
284 assert!(ring.get(2).is_none());
285 }
286
287 #[test]
288 fn test_empty_ring() {
289 let ring = CdcRing::new(10);
290 assert!(ring.is_empty());
291 assert_eq!(ring.len(), 0);
292
293 let result = ring.read(0, 100);
294 assert!(result.events.is_empty());
295 }
296
297 #[test]
298 fn test_all_ops() {
299 let mut ring = CdcRing::new(100);
300 let ops = [
301 CdcOp::Set,
302 CdcOp::Del,
303 CdcOp::Expire,
304 CdcOp::HSet,
305 CdcOp::LPush,
306 CdcOp::RPush,
307 CdcOp::SAdd,
308 CdcOp::FlushDb,
309 ];
310
311 for (i, op) in ops.iter().enumerate() {
312 ring.push(op.clone(), format!("k{}", i).into_bytes(), None, i as u64);
313 }
314
315 let result = ring.read(0, 100);
316 assert_eq!(result.events.len(), 8);
317 for (i, event) in result.events.iter().enumerate() {
318 assert_eq!(event.op, ops[i]);
319 }
320 }
321}