1use crate::{LogEntry, ReplicationError, ReplicationLog, Result};
7use chrono::{DateTime, Utc};
8use parking_lot::RwLock;
9use serde::{Deserialize, Serialize};
10use std::sync::Arc;
11use tokio::sync::mpsc;
12use uuid::Uuid;
13
14#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
16pub enum ChangeOperation {
17 Insert,
19 Update,
21 Delete,
23 Bulk,
25}
26
27#[derive(Debug, Clone, Serialize, Deserialize)]
29pub struct ChangeEvent {
30 pub id: Uuid,
32 pub sequence: u64,
34 pub timestamp: DateTime<Utc>,
36 pub operation: ChangeOperation,
38 pub collection: String,
40 pub document_id: String,
42 pub data: Vec<u8>,
44 pub metadata: serde_json::Value,
46}
47
48impl ChangeEvent {
49 pub fn new(
51 sequence: u64,
52 operation: ChangeOperation,
53 collection: String,
54 document_id: String,
55 data: Vec<u8>,
56 ) -> Self {
57 Self {
58 id: Uuid::new_v4(),
59 sequence,
60 timestamp: Utc::now(),
61 operation,
62 collection,
63 document_id,
64 data,
65 metadata: serde_json::Value::Null,
66 }
67 }
68
69 pub fn with_metadata(mut self, metadata: serde_json::Value) -> Self {
71 self.metadata = metadata;
72 self
73 }
74
75 pub fn from_log_entry(
77 entry: &LogEntry,
78 operation: ChangeOperation,
79 collection: String,
80 document_id: String,
81 ) -> Self {
82 Self {
83 id: entry.id,
84 sequence: entry.sequence,
85 timestamp: entry.timestamp,
86 operation,
87 collection,
88 document_id,
89 data: entry.data.clone(),
90 metadata: serde_json::json!({
91 "source_replica": entry.source_replica,
92 "checksum": entry.checksum,
93 }),
94 }
95 }
96}
97
98#[derive(Debug, Clone, Serialize, Deserialize)]
100pub struct Checkpoint {
101 pub sequence: u64,
103 pub timestamp: DateTime<Utc>,
105 pub consumer_group: Option<String>,
107 pub consumer_id: String,
109}
110
111impl Checkpoint {
112 pub fn new(sequence: u64, consumer_id: impl Into<String>) -> Self {
114 Self {
115 sequence,
116 timestamp: Utc::now(),
117 consumer_group: None,
118 consumer_id: consumer_id.into(),
119 }
120 }
121
122 pub fn with_group(mut self, group: impl Into<String>) -> Self {
124 self.consumer_group = Some(group.into());
125 self
126 }
127}
128
129#[derive(Debug, Clone)]
131pub struct StreamConfig {
132 pub buffer_size: usize,
134 pub batch_size: usize,
136 pub auto_checkpoint: bool,
138 pub checkpoint_interval: usize,
140}
141
142impl Default for StreamConfig {
143 fn default() -> Self {
144 Self {
145 buffer_size: 1000,
146 batch_size: 100,
147 auto_checkpoint: true,
148 checkpoint_interval: 100,
149 }
150 }
151}
152
153pub struct ReplicationStream {
155 log: Arc<ReplicationLog>,
157 config: StreamConfig,
159 checkpoint: Arc<RwLock<Option<Checkpoint>>>,
161 consumer_id: String,
163}
164
165impl ReplicationStream {
166 pub fn new(log: Arc<ReplicationLog>, consumer_id: impl Into<String>) -> Self {
168 Self {
169 log,
170 config: StreamConfig::default(),
171 checkpoint: Arc::new(RwLock::new(None)),
172 consumer_id: consumer_id.into(),
173 }
174 }
175
176 pub fn with_config(
178 log: Arc<ReplicationLog>,
179 consumer_id: impl Into<String>,
180 config: StreamConfig,
181 ) -> Self {
182 Self {
183 log,
184 config,
185 checkpoint: Arc::new(RwLock::new(None)),
186 consumer_id: consumer_id.into(),
187 }
188 }
189
190 pub async fn stream_from(
192 &self,
193 start_sequence: u64,
194 ) -> Result<mpsc::Receiver<Vec<ChangeEvent>>> {
195 let (tx, rx) = mpsc::channel(self.config.buffer_size);
196
197 let log = self.log.clone();
198 let batch_size = self.config.batch_size;
199 let checkpoint = self.checkpoint.clone();
200 let auto_checkpoint = self.config.auto_checkpoint;
201 let checkpoint_interval = self.config.checkpoint_interval;
202 let consumer_id = self.consumer_id.clone();
203
204 tokio::spawn(async move {
205 let mut current_sequence = start_sequence;
206 let mut events_since_checkpoint = 0;
207
208 loop {
209 let entries =
211 log.get_range(current_sequence + 1, current_sequence + batch_size as u64);
212
213 if entries.is_empty() {
214 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
216 continue;
217 }
218
219 let mut events = Vec::new();
221 for entry in &entries {
222 let event = ChangeEvent::from_log_entry(
225 entry,
226 ChangeOperation::Update,
227 "default".to_string(),
228 Uuid::new_v4().to_string(),
229 );
230 events.push(event);
231 }
232
233 if let Some(last_entry) = entries.last() {
235 current_sequence = last_entry.sequence;
236 }
237
238 if tx.send(events).await.is_err() {
240 break;
242 }
243
244 events_since_checkpoint += entries.len();
245
246 if auto_checkpoint && events_since_checkpoint >= checkpoint_interval {
248 let cp = Checkpoint::new(current_sequence, consumer_id.clone());
249 *checkpoint.write() = Some(cp);
250 events_since_checkpoint = 0;
251 }
252 }
253 });
254
255 Ok(rx)
256 }
257
258 pub async fn resume(&self) -> Result<mpsc::Receiver<Vec<ChangeEvent>>> {
260 let checkpoint = self.checkpoint.read();
261 let start_sequence = checkpoint.as_ref().map(|cp| cp.sequence).unwrap_or(0);
262 drop(checkpoint);
263
264 self.stream_from(start_sequence).await
265 }
266
267 pub fn get_checkpoint(&self) -> Option<Checkpoint> {
269 self.checkpoint.read().clone()
270 }
271
272 pub fn set_checkpoint(&self, checkpoint: Checkpoint) {
274 *self.checkpoint.write() = Some(checkpoint);
275 }
276
277 pub fn clear_checkpoint(&self) {
279 *self.checkpoint.write() = None;
280 }
281}
282
283pub struct StreamManager {
285 log: Arc<ReplicationLog>,
287 streams: Arc<RwLock<Vec<Arc<ReplicationStream>>>>,
289}
290
291impl StreamManager {
292 pub fn new(log: Arc<ReplicationLog>) -> Self {
294 Self {
295 log,
296 streams: Arc::new(RwLock::new(Vec::new())),
297 }
298 }
299
300 pub fn create_stream(&self, consumer_id: impl Into<String>) -> Arc<ReplicationStream> {
302 let stream = Arc::new(ReplicationStream::new(self.log.clone(), consumer_id));
303 self.streams.write().push(stream.clone());
304 stream
305 }
306
307 pub fn create_stream_with_config(
309 &self,
310 consumer_id: impl Into<String>,
311 config: StreamConfig,
312 ) -> Arc<ReplicationStream> {
313 let stream = Arc::new(ReplicationStream::with_config(
314 self.log.clone(),
315 consumer_id,
316 config,
317 ));
318 self.streams.write().push(stream.clone());
319 stream
320 }
321
322 pub fn active_streams(&self) -> Vec<Arc<ReplicationStream>> {
324 self.streams.read().clone()
325 }
326
327 pub fn stream_count(&self) -> usize {
329 self.streams.read().len()
330 }
331}
332
333#[cfg(test)]
334mod tests {
335 use super::*;
336
337 #[test]
338 fn test_change_event_creation() {
339 let event = ChangeEvent::new(
340 1,
341 ChangeOperation::Insert,
342 "vectors".to_string(),
343 "doc-1".to_string(),
344 b"data".to_vec(),
345 );
346
347 assert_eq!(event.sequence, 1);
348 assert_eq!(event.operation, ChangeOperation::Insert);
349 assert_eq!(event.collection, "vectors");
350 }
351
352 #[test]
353 fn test_checkpoint() {
354 let cp = Checkpoint::new(100, "consumer-1").with_group("group-1");
355
356 assert_eq!(cp.sequence, 100);
357 assert_eq!(cp.consumer_id, "consumer-1");
358 assert_eq!(cp.consumer_group, Some("group-1".to_string()));
359 }
360
361 #[tokio::test]
362 async fn test_replication_stream() {
363 let log = Arc::new(ReplicationLog::new("replica-1"));
364
365 log.append(b"data1".to_vec());
367 log.append(b"data2".to_vec());
368 log.append(b"data3".to_vec());
369
370 let stream = ReplicationStream::new(log.clone(), "consumer-1");
371 let mut rx = stream.stream_from(0).await.unwrap();
372
373 if let Some(events) = rx.recv().await {
375 assert!(!events.is_empty());
376 }
377 }
378
379 #[test]
380 fn test_stream_manager() {
381 let log = Arc::new(ReplicationLog::new("replica-1"));
382 let manager = StreamManager::new(log);
383
384 let stream1 = manager.create_stream("consumer-1");
385 let stream2 = manager.create_stream("consumer-2");
386
387 assert_eq!(manager.stream_count(), 2);
388 }
389
390 #[test]
391 fn test_stream_config() {
392 let config = StreamConfig {
393 buffer_size: 2000,
394 batch_size: 50,
395 auto_checkpoint: false,
396 checkpoint_interval: 200,
397 };
398
399 assert_eq!(config.buffer_size, 2000);
400 assert_eq!(config.batch_size, 50);
401 assert!(!config.auto_checkpoint);
402 }
403}