ruvector_replication/
stream.rs

1//! Change data capture and streaming for replication
2//!
3//! Provides mechanisms for streaming changes from the replication log
4//! with support for checkpointing, resumption, and backpressure handling.
5
6use crate::{LogEntry, ReplicationError, ReplicationLog, Result};
7use chrono::{DateTime, Utc};
8use parking_lot::RwLock;
9use serde::{Deserialize, Serialize};
10use std::sync::Arc;
11use tokio::sync::mpsc;
12use uuid::Uuid;
13
14/// Type of change operation
15#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
16pub enum ChangeOperation {
17    /// Insert operation
18    Insert,
19    /// Update operation
20    Update,
21    /// Delete operation
22    Delete,
23    /// Bulk operation
24    Bulk,
25}
26
27/// A change event in the replication stream
28#[derive(Debug, Clone, Serialize, Deserialize)]
29pub struct ChangeEvent {
30    /// Unique identifier for this event
31    pub id: Uuid,
32    /// Sequence number in the stream
33    pub sequence: u64,
34    /// Timestamp of the change
35    pub timestamp: DateTime<Utc>,
36    /// Type of operation
37    pub operation: ChangeOperation,
38    /// Collection/table name
39    pub collection: String,
40    /// Document/vector ID affected
41    pub document_id: String,
42    /// Serialized data for the change
43    pub data: Vec<u8>,
44    /// Metadata for the change
45    pub metadata: serde_json::Value,
46}
47
48impl ChangeEvent {
49    /// Create a new change event
50    pub fn new(
51        sequence: u64,
52        operation: ChangeOperation,
53        collection: String,
54        document_id: String,
55        data: Vec<u8>,
56    ) -> Self {
57        Self {
58            id: Uuid::new_v4(),
59            sequence,
60            timestamp: Utc::now(),
61            operation,
62            collection,
63            document_id,
64            data,
65            metadata: serde_json::Value::Null,
66        }
67    }
68
69    /// Add metadata to the change event
70    pub fn with_metadata(mut self, metadata: serde_json::Value) -> Self {
71        self.metadata = metadata;
72        self
73    }
74
75    /// Convert from a log entry
76    pub fn from_log_entry(
77        entry: &LogEntry,
78        operation: ChangeOperation,
79        collection: String,
80        document_id: String,
81    ) -> Self {
82        Self {
83            id: entry.id,
84            sequence: entry.sequence,
85            timestamp: entry.timestamp,
86            operation,
87            collection,
88            document_id,
89            data: entry.data.clone(),
90            metadata: serde_json::json!({
91                "source_replica": entry.source_replica,
92                "checksum": entry.checksum,
93            }),
94        }
95    }
96}
97
98/// Checkpoint for resuming a replication stream
99#[derive(Debug, Clone, Serialize, Deserialize)]
100pub struct Checkpoint {
101    /// Last processed sequence number
102    pub sequence: u64,
103    /// Timestamp of the checkpoint
104    pub timestamp: DateTime<Utc>,
105    /// Optional consumer group ID
106    pub consumer_group: Option<String>,
107    /// Consumer ID within the group
108    pub consumer_id: String,
109}
110
111impl Checkpoint {
112    /// Create a new checkpoint
113    pub fn new(sequence: u64, consumer_id: impl Into<String>) -> Self {
114        Self {
115            sequence,
116            timestamp: Utc::now(),
117            consumer_group: None,
118            consumer_id: consumer_id.into(),
119        }
120    }
121
122    /// Set the consumer group
123    pub fn with_group(mut self, group: impl Into<String>) -> Self {
124        self.consumer_group = Some(group.into());
125        self
126    }
127}
128
129/// Configuration for a replication stream
130#[derive(Debug, Clone)]
131pub struct StreamConfig {
132    /// Buffer size for the channel
133    pub buffer_size: usize,
134    /// Batch size for events
135    pub batch_size: usize,
136    /// Enable automatic checkpointing
137    pub auto_checkpoint: bool,
138    /// Checkpoint interval (number of events)
139    pub checkpoint_interval: usize,
140}
141
142impl Default for StreamConfig {
143    fn default() -> Self {
144        Self {
145            buffer_size: 1000,
146            batch_size: 100,
147            auto_checkpoint: true,
148            checkpoint_interval: 100,
149        }
150    }
151}
152
153/// Manages a replication stream
154pub struct ReplicationStream {
155    /// The replication log
156    log: Arc<ReplicationLog>,
157    /// Stream configuration
158    config: StreamConfig,
159    /// Current checkpoint
160    checkpoint: Arc<RwLock<Option<Checkpoint>>>,
161    /// Consumer ID
162    consumer_id: String,
163}
164
165impl ReplicationStream {
166    /// Create a new replication stream
167    pub fn new(log: Arc<ReplicationLog>, consumer_id: impl Into<String>) -> Self {
168        Self {
169            log,
170            config: StreamConfig::default(),
171            checkpoint: Arc::new(RwLock::new(None)),
172            consumer_id: consumer_id.into(),
173        }
174    }
175
176    /// Create with custom configuration
177    pub fn with_config(
178        log: Arc<ReplicationLog>,
179        consumer_id: impl Into<String>,
180        config: StreamConfig,
181    ) -> Self {
182        Self {
183            log,
184            config,
185            checkpoint: Arc::new(RwLock::new(None)),
186            consumer_id: consumer_id.into(),
187        }
188    }
189
190    /// Start streaming from a given position
191    pub async fn stream_from(
192        &self,
193        start_sequence: u64,
194    ) -> Result<mpsc::Receiver<Vec<ChangeEvent>>> {
195        let (tx, rx) = mpsc::channel(self.config.buffer_size);
196
197        let log = self.log.clone();
198        let batch_size = self.config.batch_size;
199        let checkpoint = self.checkpoint.clone();
200        let auto_checkpoint = self.config.auto_checkpoint;
201        let checkpoint_interval = self.config.checkpoint_interval;
202        let consumer_id = self.consumer_id.clone();
203
204        tokio::spawn(async move {
205            let mut current_sequence = start_sequence;
206            let mut events_since_checkpoint = 0;
207
208            loop {
209                // Get batch of entries
210                let entries =
211                    log.get_range(current_sequence + 1, current_sequence + batch_size as u64);
212
213                if entries.is_empty() {
214                    // No new entries, wait a bit
215                    tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
216                    continue;
217                }
218
219                // Convert to change events
220                let mut events = Vec::new();
221                for entry in &entries {
222                    // In a real implementation, we would decode the operation type
223                    // from the entry data. For now, we use a placeholder.
224                    let event = ChangeEvent::from_log_entry(
225                        entry,
226                        ChangeOperation::Update,
227                        "default".to_string(),
228                        Uuid::new_v4().to_string(),
229                    );
230                    events.push(event);
231                }
232
233                // Update current sequence
234                if let Some(last_entry) = entries.last() {
235                    current_sequence = last_entry.sequence;
236                }
237
238                // Send batch
239                if tx.send(events).await.is_err() {
240                    // Receiver dropped, stop streaming
241                    break;
242                }
243
244                events_since_checkpoint += entries.len();
245
246                // Auto-checkpoint if enabled
247                if auto_checkpoint && events_since_checkpoint >= checkpoint_interval {
248                    let cp = Checkpoint::new(current_sequence, consumer_id.clone());
249                    *checkpoint.write() = Some(cp);
250                    events_since_checkpoint = 0;
251                }
252            }
253        });
254
255        Ok(rx)
256    }
257
258    /// Resume streaming from the last checkpoint
259    pub async fn resume(&self) -> Result<mpsc::Receiver<Vec<ChangeEvent>>> {
260        let checkpoint = self.checkpoint.read();
261        let start_sequence = checkpoint.as_ref().map(|cp| cp.sequence).unwrap_or(0);
262        drop(checkpoint);
263
264        self.stream_from(start_sequence).await
265    }
266
267    /// Get the current checkpoint
268    pub fn get_checkpoint(&self) -> Option<Checkpoint> {
269        self.checkpoint.read().clone()
270    }
271
272    /// Set a checkpoint manually
273    pub fn set_checkpoint(&self, checkpoint: Checkpoint) {
274        *self.checkpoint.write() = Some(checkpoint);
275    }
276
277    /// Clear the checkpoint
278    pub fn clear_checkpoint(&self) {
279        *self.checkpoint.write() = None;
280    }
281}
282
283/// Manager for multiple replication streams (consumer groups)
284pub struct StreamManager {
285    /// The replication log
286    log: Arc<ReplicationLog>,
287    /// Active streams by consumer ID
288    streams: Arc<RwLock<Vec<Arc<ReplicationStream>>>>,
289}
290
291impl StreamManager {
292    /// Create a new stream manager
293    pub fn new(log: Arc<ReplicationLog>) -> Self {
294        Self {
295            log,
296            streams: Arc::new(RwLock::new(Vec::new())),
297        }
298    }
299
300    /// Create a new stream for a consumer
301    pub fn create_stream(&self, consumer_id: impl Into<String>) -> Arc<ReplicationStream> {
302        let stream = Arc::new(ReplicationStream::new(self.log.clone(), consumer_id));
303        self.streams.write().push(stream.clone());
304        stream
305    }
306
307    /// Create a stream with custom configuration
308    pub fn create_stream_with_config(
309        &self,
310        consumer_id: impl Into<String>,
311        config: StreamConfig,
312    ) -> Arc<ReplicationStream> {
313        let stream = Arc::new(ReplicationStream::with_config(
314            self.log.clone(),
315            consumer_id,
316            config,
317        ));
318        self.streams.write().push(stream.clone());
319        stream
320    }
321
322    /// Get all active streams
323    pub fn active_streams(&self) -> Vec<Arc<ReplicationStream>> {
324        self.streams.read().clone()
325    }
326
327    /// Get the number of active streams
328    pub fn stream_count(&self) -> usize {
329        self.streams.read().len()
330    }
331}
332
333#[cfg(test)]
334mod tests {
335    use super::*;
336
337    #[test]
338    fn test_change_event_creation() {
339        let event = ChangeEvent::new(
340            1,
341            ChangeOperation::Insert,
342            "vectors".to_string(),
343            "doc-1".to_string(),
344            b"data".to_vec(),
345        );
346
347        assert_eq!(event.sequence, 1);
348        assert_eq!(event.operation, ChangeOperation::Insert);
349        assert_eq!(event.collection, "vectors");
350    }
351
352    #[test]
353    fn test_checkpoint() {
354        let cp = Checkpoint::new(100, "consumer-1").with_group("group-1");
355
356        assert_eq!(cp.sequence, 100);
357        assert_eq!(cp.consumer_id, "consumer-1");
358        assert_eq!(cp.consumer_group, Some("group-1".to_string()));
359    }
360
361    #[tokio::test]
362    async fn test_replication_stream() {
363        let log = Arc::new(ReplicationLog::new("replica-1"));
364
365        // Add some entries
366        log.append(b"data1".to_vec());
367        log.append(b"data2".to_vec());
368        log.append(b"data3".to_vec());
369
370        let stream = ReplicationStream::new(log.clone(), "consumer-1");
371        let mut rx = stream.stream_from(0).await.unwrap();
372
373        // Receive events
374        if let Some(events) = rx.recv().await {
375            assert!(!events.is_empty());
376        }
377    }
378
379    #[test]
380    fn test_stream_manager() {
381        let log = Arc::new(ReplicationLog::new("replica-1"));
382        let manager = StreamManager::new(log);
383
384        let stream1 = manager.create_stream("consumer-1");
385        let stream2 = manager.create_stream("consumer-2");
386
387        assert_eq!(manager.stream_count(), 2);
388    }
389
390    #[test]
391    fn test_stream_config() {
392        let config = StreamConfig {
393            buffer_size: 2000,
394            batch_size: 50,
395            auto_checkpoint: false,
396            checkpoint_interval: 200,
397        };
398
399        assert_eq!(config.buffer_size, 2000);
400        assert_eq!(config.batch_size, 50);
401        assert!(!config.auto_checkpoint);
402    }
403}