use std::collections::HashMap;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::{SystemTime, UNIX_EPOCH};
use parking_lot::RwLock;
use serde::{Deserialize, Serialize};
use crate::error::{Error, Result};
use crate::types::{Metadata, VectorId};
use crate::VectorDB;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum OperationType {
Insert,
Update,
Delete,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ChangeEntry {
pub sequence: u64,
pub operation: OperationType,
pub document_id: VectorId,
pub vector: Option<Vec<f32>>,
pub metadata: Option<Metadata>,
pub timestamp: u64,
pub origin_id: String,
}
impl ChangeEntry {
pub fn insert(
sequence: u64,
origin_id: &str,
doc_id: impl Into<VectorId>,
vector: Vec<f32>,
metadata: Option<Metadata>,
) -> Self {
Self {
sequence,
operation: OperationType::Insert,
document_id: doc_id.into(),
vector: Some(vector),
metadata,
timestamp: current_timestamp(),
origin_id: origin_id.to_string(),
}
}
pub fn update(
sequence: u64,
origin_id: &str,
doc_id: impl Into<VectorId>,
vector: Vec<f32>,
metadata: Option<Metadata>,
) -> Self {
Self {
sequence,
operation: OperationType::Update,
document_id: doc_id.into(),
vector: Some(vector),
metadata,
timestamp: current_timestamp(),
origin_id: origin_id.to_string(),
}
}
pub fn delete(sequence: u64, origin_id: &str, doc_id: impl Into<VectorId>) -> Self {
Self {
sequence,
operation: OperationType::Delete,
document_id: doc_id.into(),
vector: None,
metadata: None,
timestamp: current_timestamp(),
origin_id: origin_id.to_string(),
}
}
}
fn current_timestamp() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as u64
}
fn generate_instance_id() -> String {
use std::process;
use std::time::Instant;
let pid = process::id();
let time = Instant::now().elapsed().as_nanos();
format!("inst-{:x}-{:x}", pid, time as u64)
}
#[derive(Debug)]
pub struct ChangeLog {
instance_id: String,
sequence: AtomicU64,
entries: RwLock<Vec<ChangeEntry>>,
last_checkpoint: AtomicU64,
max_entries: usize,
}
impl Default for ChangeLog {
fn default() -> Self {
Self::new()
}
}
impl ChangeLog {
pub fn new() -> Self {
Self {
instance_id: generate_instance_id(),
sequence: AtomicU64::new(0),
entries: RwLock::new(Vec::new()),
last_checkpoint: AtomicU64::new(0),
max_entries: 10000,
}
}
pub fn with_instance_id(instance_id: impl Into<String>) -> Self {
Self {
instance_id: instance_id.into(),
sequence: AtomicU64::new(0),
entries: RwLock::new(Vec::new()),
last_checkpoint: AtomicU64::new(0),
max_entries: 10000,
}
}
pub fn instance_id(&self) -> &str {
&self.instance_id
}
pub fn current_sequence(&self) -> u64 {
self.sequence.load(Ordering::SeqCst)
}
pub fn track_insert(
&self,
doc_id: impl Into<VectorId>,
vector: &[f32],
metadata: Option<Metadata>,
) -> u64 {
let seq = {
let mut entries = self.entries.write();
let seq = self.sequence.fetch_add(1, Ordering::SeqCst);
let entry =
ChangeEntry::insert(seq, &self.instance_id, doc_id, vector.to_vec(), metadata);
entries.push(entry);
seq
};
self.maybe_compact();
seq
}
pub fn track_update(
&self,
doc_id: impl Into<VectorId>,
vector: &[f32],
metadata: Option<Metadata>,
) -> u64 {
let seq = {
let mut entries = self.entries.write();
let seq = self.sequence.fetch_add(1, Ordering::SeqCst);
let entry =
ChangeEntry::update(seq, &self.instance_id, doc_id, vector.to_vec(), metadata);
entries.push(entry);
seq
};
self.maybe_compact();
seq
}
pub fn track_delete(&self, doc_id: impl Into<VectorId>) -> u64 {
let seq = {
let mut entries = self.entries.write();
let seq = self.sequence.fetch_add(1, Ordering::SeqCst);
let entry = ChangeEntry::delete(seq, &self.instance_id, doc_id);
entries.push(entry);
seq
};
self.maybe_compact();
seq
}
pub fn export_since(&self, since_sequence: u64) -> Vec<ChangeEntry> {
self.entries
.read()
.iter()
.filter(|e| e.sequence >= since_sequence)
.cloned()
.collect()
}
pub fn export_all(&self) -> Vec<ChangeEntry> {
self.entries.read().clone()
}
pub fn checkpoint(&self) -> u64 {
let seq = self.sequence.load(Ordering::SeqCst);
self.last_checkpoint.store(seq, Ordering::SeqCst);
seq
}
pub fn last_checkpoint(&self) -> u64 {
self.last_checkpoint.load(Ordering::SeqCst)
}
pub fn export_since_checkpoint(&self) -> Vec<ChangeEntry> {
self.export_since(self.last_checkpoint())
}
pub fn len(&self) -> usize {
self.entries.read().len()
}
pub fn is_empty(&self) -> bool {
self.entries.read().is_empty()
}
pub fn truncate_before(&self, sequence: u64) {
self.entries.write().retain(|e| e.sequence >= sequence);
}
fn maybe_compact(&self) {
let len = self.entries.read().len();
if len > self.max_entries {
let keep = self.max_entries / 2;
let mut entries = self.entries.write();
if entries.len() > keep {
let start = entries.len() - keep;
*entries = entries[start..].to_vec();
}
}
}
pub fn to_json(&self) -> Result<String> {
let data = ChangeLogData {
instance_id: self.instance_id.clone(),
sequence: self.sequence.load(Ordering::SeqCst),
entries: self.entries.read().clone(),
last_checkpoint: self.last_checkpoint.load(Ordering::SeqCst),
};
serde_json::to_string(&data).map_err(|e| Error::Serialization(e.to_string()))
}
pub fn from_json(json: &str) -> Result<Self> {
let data: ChangeLogData =
serde_json::from_str(json).map_err(|e| Error::Serialization(e.to_string()))?;
Ok(Self {
instance_id: data.instance_id,
sequence: AtomicU64::new(data.sequence),
entries: RwLock::new(data.entries),
last_checkpoint: AtomicU64::new(data.last_checkpoint),
max_entries: 10000,
})
}
}
#[derive(Serialize, Deserialize)]
struct ChangeLogData {
instance_id: String,
sequence: u64,
entries: Vec<ChangeEntry>,
last_checkpoint: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ReplicationState {
pub local_id: String,
pub remote_id: String,
pub last_synced_sequence: u64,
pub last_sync_time: u64,
pub changes_applied: u64,
}
#[derive(Debug, Clone)]
pub struct SyncResult {
pub applied: usize,
pub skipped: usize,
pub conflicts: Vec<ConflictInfo>,
pub new_sequence: u64,
}
#[derive(Debug, Clone)]
pub struct ConflictInfo {
pub document_id: VectorId,
pub local_operation: OperationType,
pub remote_operation: OperationType,
pub local_timestamp: u64,
pub remote_timestamp: u64,
pub resolution: ConflictResolution,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ConflictResolution {
KeepLocal,
ApplyRemote,
LastWriteWins,
}
pub struct ReplicationManager {
states: RwLock<HashMap<String, ReplicationState>>,
conflict_strategy: ConflictResolution,
}
impl Default for ReplicationManager {
fn default() -> Self {
Self::new()
}
}
impl ReplicationManager {
pub fn new() -> Self {
Self {
states: RwLock::new(HashMap::new()),
conflict_strategy: ConflictResolution::LastWriteWins,
}
}
pub fn with_conflict_strategy(mut self, strategy: ConflictResolution) -> Self {
self.conflict_strategy = strategy;
self
}
pub fn apply_changes(db: &VectorDB, changes: &[ChangeEntry]) -> Result<SyncResult> {
let mut applied = 0;
let mut skipped = 0;
let conflicts = Vec::new();
let mut last_seq = 0u64;
for change in changes {
last_seq = last_seq.max(change.sequence);
match change.operation {
OperationType::Insert => {
if let Some(ref vector) = change.vector {
match db.insert_document(
&change.document_id,
Some(vector),
change.metadata.clone(),
) {
Ok(()) => applied += 1,
Err(Error::AlreadyExists(_)) => skipped += 1,
Err(e) => return Err(e),
}
}
}
OperationType::Update => {
if let Some(ref vector) = change.vector {
if db.contains(&change.document_id) {
db.update(&change.document_id, vector, change.metadata.clone())?;
} else {
db.insert_document(
&change.document_id,
Some(vector),
change.metadata.clone(),
)?;
}
applied += 1;
}
}
OperationType::Delete => {
if db.delete(&change.document_id)? {
applied += 1;
} else {
skipped += 1;
}
}
}
}
Ok(SyncResult {
applied,
skipped,
conflicts,
new_sequence: last_seq,
})
}
pub fn sync(
&self,
local_db: &VectorDB,
local_log: &ChangeLog,
remote_changes: &[ChangeEntry],
remote_id: &str,
) -> Result<SyncResult> {
let mut states = self.states.write();
let state = states
.entry(remote_id.to_string())
.or_insert(ReplicationState {
local_id: local_log.instance_id().to_string(),
remote_id: remote_id.to_string(),
last_synced_sequence: 0,
last_sync_time: current_timestamp(),
changes_applied: 0,
});
let new_changes: Vec<_> = remote_changes
.iter()
.filter(|c| c.sequence > state.last_synced_sequence)
.cloned()
.collect();
let result = Self::apply_changes(local_db, &new_changes)?;
state.last_synced_sequence = result.new_sequence;
state.last_sync_time = current_timestamp();
state.changes_applied += result.applied as u64;
Ok(result)
}
pub fn get_state(&self, remote_id: &str) -> Option<ReplicationState> {
self.states.read().get(remote_id).cloned()
}
pub fn list_remotes(&self) -> Vec<ReplicationState> {
self.states.read().values().cloned().collect()
}
pub fn create_snapshot(db: &VectorDB) -> Result<Vec<ChangeEntry>> {
let ids = db.list_ids()?;
let mut entries = Vec::with_capacity(ids.len());
for (i, id) in ids.iter().enumerate() {
if let Some((vector, metadata)) = db.get(id)? {
if let Some(vec) = vector {
entries.push(ChangeEntry::insert(
i as u64,
"snapshot",
id.clone(),
vec,
metadata,
));
}
}
}
Ok(entries)
}
pub fn apply_snapshot(db: &VectorDB, snapshot: &[ChangeEntry]) -> Result<usize> {
let result = Self::apply_changes(db, snapshot)?;
Ok(result.applied)
}
}
#[derive(Debug, Clone)]
pub struct ReplicationConfig {
pub instance_id: String,
pub conflict_strategy: ConflictResolution,
pub max_log_entries: usize,
pub auto_compact: bool,
}
impl Default for ReplicationConfig {
fn default() -> Self {
Self {
instance_id: generate_instance_id(),
conflict_strategy: ConflictResolution::LastWriteWins,
max_log_entries: 10000,
auto_compact: true,
}
}
}
impl ReplicationConfig {
pub fn new() -> Self {
Self::default()
}
pub fn with_instance_id(mut self, id: impl Into<String>) -> Self {
self.instance_id = id.into();
self
}
pub fn with_conflict_strategy(mut self, strategy: ConflictResolution) -> Self {
self.conflict_strategy = strategy;
self
}
pub fn with_max_log_entries(mut self, max: usize) -> Self {
self.max_log_entries = max;
self
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::Config;
#[test]
fn test_change_log_tracking() {
let log = ChangeLog::new();
let seq1 = log.track_insert("doc-1", &[0.1, 0.2, 0.3], None);
let seq2 = log.track_update("doc-1", &[0.2, 0.3, 0.4], None);
let seq3 = log.track_delete("doc-1");
assert_eq!(seq1, 0);
assert_eq!(seq2, 1);
assert_eq!(seq3, 2);
assert_eq!(log.len(), 3);
}
#[test]
fn test_export_since() {
let log = ChangeLog::new();
log.track_insert("doc-1", &[0.1; 3], None);
log.track_insert("doc-2", &[0.2; 3], None);
log.track_insert("doc-3", &[0.3; 3], None);
let changes = log.export_since(1);
assert_eq!(changes.len(), 2);
assert_eq!(changes[0].document_id, "doc-2");
assert_eq!(changes[1].document_id, "doc-3");
}
#[test]
fn test_checkpoint() {
let log = ChangeLog::new();
log.track_insert("doc-1", &[0.1; 3], None);
log.track_insert("doc-2", &[0.2; 3], None);
let checkpoint = log.checkpoint();
assert_eq!(checkpoint, 2);
log.track_insert("doc-3", &[0.3; 3], None);
let changes = log.export_since_checkpoint();
assert_eq!(changes.len(), 1);
assert_eq!(changes[0].document_id, "doc-3");
}
#[test]
fn test_apply_changes() {
let db = VectorDB::new(Config::new(3)).unwrap();
let changes = vec![
ChangeEntry::insert(0, "remote", "doc-1", vec![0.1, 0.2, 0.3], None),
ChangeEntry::insert(1, "remote", "doc-2", vec![0.4, 0.5, 0.6], None),
];
let result = ReplicationManager::apply_changes(&db, &changes).unwrap();
assert_eq!(result.applied, 2);
assert_eq!(result.skipped, 0);
assert_eq!(db.len(), 2);
}
#[test]
fn test_snapshot_and_restore() {
let source = VectorDB::new(Config::new(3)).unwrap();
source.insert("doc-1", &[0.1, 0.2, 0.3], None).unwrap();
source.insert("doc-2", &[0.4, 0.5, 0.6], None).unwrap();
let snapshot = ReplicationManager::create_snapshot(&source).unwrap();
assert_eq!(snapshot.len(), 2);
let replica = VectorDB::new(Config::new(3)).unwrap();
let count = ReplicationManager::apply_snapshot(&replica, &snapshot).unwrap();
assert_eq!(count, 2);
assert_eq!(replica.len(), 2);
assert!(replica.contains("doc-1"));
assert!(replica.contains("doc-2"));
}
#[test]
fn test_log_serialization() {
let log = ChangeLog::with_instance_id("test-instance");
log.track_insert("doc-1", &[0.1, 0.2], None);
log.track_delete("doc-2");
let json = log.to_json().unwrap();
let restored = ChangeLog::from_json(&json).unwrap();
assert_eq!(restored.instance_id(), "test-instance");
assert_eq!(restored.len(), 2);
}
#[test]
fn test_incremental_sync() {
let primary = VectorDB::new(Config::new(3)).unwrap();
let log = ChangeLog::with_instance_id("primary");
log.track_insert("doc-1", &[0.1; 3], None);
primary.insert("doc-1", &[0.1; 3], None).unwrap();
log.checkpoint();
log.track_insert("doc-2", &[0.2; 3], None);
primary.insert("doc-2", &[0.2; 3], None).unwrap();
let replica = VectorDB::new(Config::new(3)).unwrap();
let _manager = ReplicationManager::new();
let snapshot = ReplicationManager::create_snapshot(&primary).unwrap();
ReplicationManager::apply_snapshot(&replica, &snapshot).unwrap();
assert_eq!(replica.len(), 2);
}
}