ruvector-replication 2.0.6

Data replication and synchronization for ruvector
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
//! Change data capture and streaming for replication
//!
//! Provides mechanisms for streaming changes from the replication log
//! with support for checkpointing, resumption, and backpressure handling.

use crate::{LogEntry, ReplicationError, ReplicationLog, Result};
use chrono::{DateTime, Utc};
use parking_lot::RwLock;
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use tokio::sync::mpsc;
use uuid::Uuid;

/// Type of change operation
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum ChangeOperation {
    /// Insert operation
    Insert,
    /// Update operation
    Update,
    /// Delete operation
    Delete,
    /// Bulk operation
    Bulk,
}

/// A change event in the replication stream
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ChangeEvent {
    /// Unique identifier for this event
    pub id: Uuid,
    /// Sequence number in the stream
    pub sequence: u64,
    /// Timestamp of the change
    pub timestamp: DateTime<Utc>,
    /// Type of operation
    pub operation: ChangeOperation,
    /// Collection/table name
    pub collection: String,
    /// Document/vector ID affected
    pub document_id: String,
    /// Serialized data for the change
    pub data: Vec<u8>,
    /// Metadata for the change
    pub metadata: serde_json::Value,
}

impl ChangeEvent {
    /// Create a new change event
    pub fn new(
        sequence: u64,
        operation: ChangeOperation,
        collection: String,
        document_id: String,
        data: Vec<u8>,
    ) -> Self {
        Self {
            id: Uuid::new_v4(),
            sequence,
            timestamp: Utc::now(),
            operation,
            collection,
            document_id,
            data,
            metadata: serde_json::Value::Null,
        }
    }

    /// Add metadata to the change event
    pub fn with_metadata(mut self, metadata: serde_json::Value) -> Self {
        self.metadata = metadata;
        self
    }

    /// Convert from a log entry
    pub fn from_log_entry(
        entry: &LogEntry,
        operation: ChangeOperation,
        collection: String,
        document_id: String,
    ) -> Self {
        Self {
            id: entry.id,
            sequence: entry.sequence,
            timestamp: entry.timestamp,
            operation,
            collection,
            document_id,
            data: entry.data.clone(),
            metadata: serde_json::json!({
                "source_replica": entry.source_replica,
                "checksum": entry.checksum,
            }),
        }
    }
}

/// Checkpoint for resuming a replication stream
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Checkpoint {
    /// Last processed sequence number
    pub sequence: u64,
    /// Timestamp of the checkpoint
    pub timestamp: DateTime<Utc>,
    /// Optional consumer group ID
    pub consumer_group: Option<String>,
    /// Consumer ID within the group
    pub consumer_id: String,
}

impl Checkpoint {
    /// Create a new checkpoint
    pub fn new(sequence: u64, consumer_id: impl Into<String>) -> Self {
        Self {
            sequence,
            timestamp: Utc::now(),
            consumer_group: None,
            consumer_id: consumer_id.into(),
        }
    }

    /// Set the consumer group
    pub fn with_group(mut self, group: impl Into<String>) -> Self {
        self.consumer_group = Some(group.into());
        self
    }
}

/// Configuration for a replication stream
#[derive(Debug, Clone)]
pub struct StreamConfig {
    /// Buffer size for the channel
    pub buffer_size: usize,
    /// Batch size for events
    pub batch_size: usize,
    /// Enable automatic checkpointing
    pub auto_checkpoint: bool,
    /// Checkpoint interval (number of events)
    pub checkpoint_interval: usize,
}

impl Default for StreamConfig {
    fn default() -> Self {
        Self {
            buffer_size: 1000,
            batch_size: 100,
            auto_checkpoint: true,
            checkpoint_interval: 100,
        }
    }
}

/// Manages a replication stream
pub struct ReplicationStream {
    /// The replication log
    log: Arc<ReplicationLog>,
    /// Stream configuration
    config: StreamConfig,
    /// Current checkpoint
    checkpoint: Arc<RwLock<Option<Checkpoint>>>,
    /// Consumer ID
    consumer_id: String,
}

impl ReplicationStream {
    /// Create a new replication stream
    pub fn new(log: Arc<ReplicationLog>, consumer_id: impl Into<String>) -> Self {
        Self {
            log,
            config: StreamConfig::default(),
            checkpoint: Arc::new(RwLock::new(None)),
            consumer_id: consumer_id.into(),
        }
    }

    /// Create with custom configuration
    pub fn with_config(
        log: Arc<ReplicationLog>,
        consumer_id: impl Into<String>,
        config: StreamConfig,
    ) -> Self {
        Self {
            log,
            config,
            checkpoint: Arc::new(RwLock::new(None)),
            consumer_id: consumer_id.into(),
        }
    }

    /// Start streaming from a given position
    pub async fn stream_from(
        &self,
        start_sequence: u64,
    ) -> Result<mpsc::Receiver<Vec<ChangeEvent>>> {
        let (tx, rx) = mpsc::channel(self.config.buffer_size);

        let log = self.log.clone();
        let batch_size = self.config.batch_size;
        let checkpoint = self.checkpoint.clone();
        let auto_checkpoint = self.config.auto_checkpoint;
        let checkpoint_interval = self.config.checkpoint_interval;
        let consumer_id = self.consumer_id.clone();

        tokio::spawn(async move {
            let mut current_sequence = start_sequence;
            let mut events_since_checkpoint = 0;

            loop {
                // Get batch of entries
                let entries =
                    log.get_range(current_sequence + 1, current_sequence + batch_size as u64);

                if entries.is_empty() {
                    // No new entries, wait a bit
                    tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
                    continue;
                }

                // Convert to change events
                let mut events = Vec::new();
                for entry in &entries {
                    // In a real implementation, we would decode the operation type
                    // from the entry data. For now, we use a placeholder.
                    let event = ChangeEvent::from_log_entry(
                        entry,
                        ChangeOperation::Update,
                        "default".to_string(),
                        Uuid::new_v4().to_string(),
                    );
                    events.push(event);
                }

                // Update current sequence
                if let Some(last_entry) = entries.last() {
                    current_sequence = last_entry.sequence;
                }

                // Send batch
                if tx.send(events).await.is_err() {
                    // Receiver dropped, stop streaming
                    break;
                }

                events_since_checkpoint += entries.len();

                // Auto-checkpoint if enabled
                if auto_checkpoint && events_since_checkpoint >= checkpoint_interval {
                    let cp = Checkpoint::new(current_sequence, consumer_id.clone());
                    *checkpoint.write() = Some(cp);
                    events_since_checkpoint = 0;
                }
            }
        });

        Ok(rx)
    }

    /// Resume streaming from the last checkpoint
    pub async fn resume(&self) -> Result<mpsc::Receiver<Vec<ChangeEvent>>> {
        let checkpoint = self.checkpoint.read();
        let start_sequence = checkpoint.as_ref().map(|cp| cp.sequence).unwrap_or(0);
        drop(checkpoint);

        self.stream_from(start_sequence).await
    }

    /// Get the current checkpoint
    pub fn get_checkpoint(&self) -> Option<Checkpoint> {
        self.checkpoint.read().clone()
    }

    /// Set a checkpoint manually
    pub fn set_checkpoint(&self, checkpoint: Checkpoint) {
        *self.checkpoint.write() = Some(checkpoint);
    }

    /// Clear the checkpoint
    pub fn clear_checkpoint(&self) {
        *self.checkpoint.write() = None;
    }
}

/// Manager for multiple replication streams (consumer groups)
pub struct StreamManager {
    /// The replication log
    log: Arc<ReplicationLog>,
    /// Active streams by consumer ID
    streams: Arc<RwLock<Vec<Arc<ReplicationStream>>>>,
}

impl StreamManager {
    /// Create a new stream manager
    pub fn new(log: Arc<ReplicationLog>) -> Self {
        Self {
            log,
            streams: Arc::new(RwLock::new(Vec::new())),
        }
    }

    /// Create a new stream for a consumer
    pub fn create_stream(&self, consumer_id: impl Into<String>) -> Arc<ReplicationStream> {
        let stream = Arc::new(ReplicationStream::new(self.log.clone(), consumer_id));
        self.streams.write().push(stream.clone());
        stream
    }

    /// Create a stream with custom configuration
    pub fn create_stream_with_config(
        &self,
        consumer_id: impl Into<String>,
        config: StreamConfig,
    ) -> Arc<ReplicationStream> {
        let stream = Arc::new(ReplicationStream::with_config(
            self.log.clone(),
            consumer_id,
            config,
        ));
        self.streams.write().push(stream.clone());
        stream
    }

    /// Get all active streams
    pub fn active_streams(&self) -> Vec<Arc<ReplicationStream>> {
        self.streams.read().clone()
    }

    /// Get the number of active streams
    pub fn stream_count(&self) -> usize {
        self.streams.read().len()
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_change_event_creation() {
        let event = ChangeEvent::new(
            1,
            ChangeOperation::Insert,
            "vectors".to_string(),
            "doc-1".to_string(),
            b"data".to_vec(),
        );

        assert_eq!(event.sequence, 1);
        assert_eq!(event.operation, ChangeOperation::Insert);
        assert_eq!(event.collection, "vectors");
    }

    #[test]
    fn test_checkpoint() {
        let cp = Checkpoint::new(100, "consumer-1").with_group("group-1");

        assert_eq!(cp.sequence, 100);
        assert_eq!(cp.consumer_id, "consumer-1");
        assert_eq!(cp.consumer_group, Some("group-1".to_string()));
    }

    #[tokio::test]
    async fn test_replication_stream() {
        let log = Arc::new(ReplicationLog::new("replica-1"));

        // Add some entries
        log.append(b"data1".to_vec());
        log.append(b"data2".to_vec());
        log.append(b"data3".to_vec());

        let stream = ReplicationStream::new(log.clone(), "consumer-1");
        let mut rx = stream.stream_from(0).await.unwrap();

        // Receive events
        if let Some(events) = rx.recv().await {
            assert!(!events.is_empty());
        }
    }

    #[test]
    fn test_stream_manager() {
        let log = Arc::new(ReplicationLog::new("replica-1"));
        let manager = StreamManager::new(log);

        let stream1 = manager.create_stream("consumer-1");
        let stream2 = manager.create_stream("consumer-2");

        assert_eq!(manager.stream_count(), 2);
    }

    #[test]
    fn test_stream_config() {
        let config = StreamConfig {
            buffer_size: 2000,
            batch_size: 50,
            auto_checkpoint: false,
            checkpoint_interval: 200,
        };

        assert_eq!(config.buffer_size, 2000);
        assert_eq!(config.batch_size, 50);
        assert!(!config.auto_checkpoint);
    }
}