use alloc::collections::BTreeMap;
use alloc::string::String;
use alloc::vec::Vec;
use lazy_static::lazy_static;
use spin::Mutex;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ReplicationMode {
ActivePassive,
ActiveActive,
Snapshot,
}
impl ReplicationMode {
pub fn name(&self) -> &'static str {
match self {
ReplicationMode::ActivePassive => "Active-Passive",
ReplicationMode::ActiveActive => "Active-Active",
ReplicationMode::Snapshot => "Snapshot",
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ReplicationState {
Syncing,
Synchronized,
Lagging,
Disconnected,
Conflict,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ConflictResolution {
LastWriteWins,
VersionVector,
Manual,
}
#[derive(Debug, Clone)]
pub struct ReplicationTarget {
pub id: u64,
pub address: String,
pub mode: ReplicationMode,
pub state: ReplicationState,
pub last_sync: u64,
pub pending_bytes: u64,
pub lag_ms: u64,
pub connected: bool,
}
impl ReplicationTarget {
pub fn new(id: u64, address: String, mode: ReplicationMode) -> Self {
Self {
id,
address,
mode,
state: ReplicationState::Disconnected,
last_sync: 0,
pending_bytes: 0,
lag_ms: 0,
connected: false,
}
}
pub fn connect(&mut self, timestamp: u64) -> Result<(), &'static str> {
if self.connected {
return Err("Already connected");
}
self.connected = true;
self.last_sync = timestamp;
self.state = ReplicationState::Syncing;
crate::lcpfs_println!(
"[ REPL ] Connected to {} (mode: {})",
self.address,
self.mode.name()
);
Ok(())
}
pub fn update_state(&mut self, pending_bytes: u64, lag_ms: u64, timestamp: u64) {
self.pending_bytes = pending_bytes;
self.lag_ms = lag_ms;
self.last_sync = timestamp;
self.state = if !self.connected {
ReplicationState::Disconnected
} else if pending_bytes == 0 && lag_ms < 1000 {
ReplicationState::Synchronized
} else if lag_ms < 10_000 {
ReplicationState::Syncing
} else {
ReplicationState::Lagging
};
}
}
#[derive(Debug, Clone)]
pub struct ReplicationLogEntry {
pub id: u64,
pub txg: u64,
pub operation: ReplicationOp,
pub size: u64,
pub timestamp: u64,
pub checksum: u64,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ReplicationOp {
Write,
Delete,
SetProperty,
CreateSnapshot,
DestroySnapshot,
}
#[derive(Debug, Clone)]
pub struct VersionVector {
versions: BTreeMap<u64, u64>,
}
impl Default for VersionVector {
fn default() -> Self {
Self::new()
}
}
impl VersionVector {
pub fn new() -> Self {
Self {
versions: BTreeMap::new(),
}
}
pub fn increment(&mut self, node_id: u64) {
*self.versions.entry(node_id).or_insert(0) += 1;
}
pub fn get(&self, node_id: u64) -> u64 {
self.versions.get(&node_id).copied().unwrap_or(0)
}
pub fn happens_before(&self, other: &VersionVector) -> bool {
let mut strictly_less = false;
for (&node_id, &version) in &self.versions {
let other_version = other.get(node_id);
if version > other_version {
return false; } else if version < other_version {
strictly_less = true;
}
}
strictly_less
}
pub fn is_concurrent(&self, other: &VersionVector) -> bool {
!self.happens_before(other) && !other.happens_before(self)
}
}
#[derive(Debug, Clone, Default)]
pub struct ReplicationStats {
pub bytes_replicated: u64,
pub entries_replicated: u64,
pub errors: u64,
pub conflicts: u64,
pub conflicts_resolved: u64,
}
lazy_static! {
static ref REPLICATION_MANAGER: Mutex<ReplicationManager> = Mutex::new(ReplicationManager::new());
}
pub struct ReplicationManager {
local_id: u64,
targets: BTreeMap<u64, ReplicationTarget>,
log: Vec<ReplicationLogEntry>,
next_log_id: u64,
version: VersionVector,
conflict_resolution: ConflictResolution,
stats: ReplicationStats,
}
impl Default for ReplicationManager {
fn default() -> Self {
Self::new()
}
}
impl ReplicationManager {
pub fn new() -> Self {
Self {
local_id: 1,
targets: BTreeMap::new(),
log: Vec::new(),
next_log_id: 1,
version: VersionVector::new(),
conflict_resolution: ConflictResolution::LastWriteWins,
stats: ReplicationStats::default(),
}
}
pub fn set_local_id(&mut self, id: u64) {
self.local_id = id;
}
pub fn add_target(&mut self, target: ReplicationTarget) {
crate::lcpfs_println!(
"[ REPL ] Added target {} at {} ({})",
target.id,
target.address,
target.mode.name()
);
self.targets.insert(target.id, target);
}
pub fn connect(&mut self, target_id: u64, timestamp: u64) -> Result<(), &'static str> {
let target = self.targets.get_mut(&target_id).ok_or("Target not found")?;
target.connect(timestamp)
}
pub fn append_log(
&mut self,
txg: u64,
operation: ReplicationOp,
size: u64,
timestamp: u64,
) -> u64 {
let entry_id = self.next_log_id;
self.next_log_id += 1;
let entry = ReplicationLogEntry {
id: entry_id,
txg,
operation,
size,
timestamp,
checksum: timestamp ^ size, };
self.log.push(entry);
self.version.increment(self.local_id);
entry_id
}
pub fn replicate(
&mut self,
target_id: u64,
from_id: u64,
timestamp: u64,
) -> Result<u64, &'static str> {
let target = self.targets.get(&target_id).ok_or("Target not found")?;
if !target.connected {
return Err("Target not connected");
}
let entries: Vec<&ReplicationLogEntry> =
self.log.iter().filter(|e| e.id >= from_id).collect();
let bytes: u64 = entries.iter().map(|e| e.size).sum();
let count = entries.len() as u64;
self.stats.bytes_replicated += bytes;
self.stats.entries_replicated += count;
if let Some(target) = self.targets.get_mut(&target_id) {
let lag_ms = timestamp.saturating_sub(target.last_sync);
target.update_state(0, lag_ms, timestamp);
}
Ok(count)
}
pub fn handle_remote_write(
&mut self,
remote_id: u64,
remote_version: &VersionVector,
timestamp: u64,
) -> Result<(), &'static str> {
if self.version.is_concurrent(remote_version) {
self.stats.conflicts += 1;
match self.conflict_resolution {
ConflictResolution::LastWriteWins => {
self.stats.conflicts_resolved += 1;
}
ConflictResolution::VersionVector => {
if remote_version.happens_before(&self.version) {
return Err("Conflict: local version is newer");
}
self.stats.conflicts_resolved += 1;
}
ConflictResolution::Manual => {
return Err("Conflict requires manual resolution");
}
}
}
for (&node_id, &version) in &remote_version.versions {
let local_version = self.version.get(node_id);
if version > local_version {
*self.version.versions.entry(node_id).or_insert(0) = version;
}
}
Ok(())
}
pub fn get_lag(&self, target_id: u64) -> Option<u64> {
self.targets.get(&target_id).map(|t| t.lag_ms)
}
pub fn synchronized_targets(&self) -> Vec<u64> {
self.targets
.iter()
.filter(|(_, t)| t.state == ReplicationState::Synchronized)
.map(|(id, _)| *id)
.collect()
}
pub fn stats(&self) -> ReplicationStats {
self.stats.clone()
}
}
pub struct Replication;
impl Replication {
pub fn add_target(target: ReplicationTarget) {
let mut mgr = REPLICATION_MANAGER.lock();
mgr.add_target(target);
}
pub fn connect(target_id: u64, timestamp: u64) -> Result<(), &'static str> {
let mut mgr = REPLICATION_MANAGER.lock();
mgr.connect(target_id, timestamp)
}
pub fn append_log(txg: u64, operation: ReplicationOp, size: u64, timestamp: u64) -> u64 {
let mut mgr = REPLICATION_MANAGER.lock();
mgr.append_log(txg, operation, size, timestamp)
}
pub fn replicate(target_id: u64, from_id: u64, timestamp: u64) -> Result<u64, &'static str> {
let mut mgr = REPLICATION_MANAGER.lock();
mgr.replicate(target_id, from_id, timestamp)
}
pub fn stats() -> ReplicationStats {
let mgr = REPLICATION_MANAGER.lock();
mgr.stats()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_replication_mode() {
assert_eq!(ReplicationMode::ActivePassive.name(), "Active-Passive");
assert_eq!(ReplicationMode::ActiveActive.name(), "Active-Active");
}
#[test]
fn test_target_creation() {
let target = ReplicationTarget::new(
1,
"192.168.1.100:7777".into(),
ReplicationMode::ActivePassive,
);
assert_eq!(target.id, 1);
assert_eq!(target.state, ReplicationState::Disconnected);
assert!(!target.connected);
}
#[test]
fn test_target_connection() {
let mut target = ReplicationTarget::new(
1,
"192.168.1.100:7777".into(),
ReplicationMode::ActivePassive,
);
assert!(target.connect(1000).is_ok());
assert!(target.connected);
assert_eq!(target.state, ReplicationState::Syncing);
}
#[test]
fn test_version_vector() {
let mut v1 = VersionVector::new();
let mut v2 = VersionVector::new();
v1.increment(1);
v1.increment(2);
v2.increment(1);
v2.increment(1);
v2.increment(2);
v2.increment(3);
assert!(v1.happens_before(&v2)); assert!(!v2.happens_before(&v1));
}
#[test]
fn test_concurrent_vectors() {
let mut v1 = VersionVector::new();
let mut v2 = VersionVector::new();
v1.increment(1);
v1.increment(1);
v2.increment(2);
v2.increment(2);
assert!(v1.is_concurrent(&v2));
assert!(v2.is_concurrent(&v1));
}
#[test]
fn test_manager_basic() {
let mut mgr = ReplicationManager::new();
let target =
ReplicationTarget::new(2, "10.0.0.2:7777".into(), ReplicationMode::ActivePassive);
mgr.add_target(target);
assert_eq!(mgr.targets.len(), 1);
}
#[test]
fn test_log_append() {
let mut mgr = ReplicationManager::new();
let entry_id = mgr.append_log(1, ReplicationOp::Write, 4096, 1000);
assert_eq!(entry_id, 1);
let entry_id2 = mgr.append_log(2, ReplicationOp::Delete, 0, 1100);
assert_eq!(entry_id2, 2);
assert_eq!(mgr.log.len(), 2);
}
#[test]
fn test_replication() {
let mut mgr = ReplicationManager::new();
let mut target =
ReplicationTarget::new(2, "10.0.0.2:7777".into(), ReplicationMode::ActivePassive);
target
.connect(1000)
.expect("test: operation should succeed");
mgr.add_target(target);
mgr.append_log(1, ReplicationOp::Write, 4096, 1000);
mgr.append_log(2, ReplicationOp::Write, 8192, 1100);
let count = mgr
.replicate(2, 1, 1200)
.expect("test: operation should succeed");
assert_eq!(count, 2);
assert_eq!(mgr.stats.entries_replicated, 2);
assert_eq!(mgr.stats.bytes_replicated, 12288);
}
#[test]
fn test_conflict_detection() {
let mut mgr = ReplicationManager::new();
mgr.set_local_id(1);
mgr.version.increment(1);
mgr.version.increment(1);
let mut remote_version = VersionVector::new();
remote_version.increment(2);
remote_version.increment(2);
mgr.conflict_resolution = ConflictResolution::LastWriteWins;
let result = mgr.handle_remote_write(2, &remote_version, 2000);
assert!(result.is_ok());
assert_eq!(mgr.stats.conflicts, 1);
assert_eq!(mgr.stats.conflicts_resolved, 1);
}
#[test]
fn test_state_updates() {
let mut target =
ReplicationTarget::new(1, "10.0.0.1:7777".into(), ReplicationMode::ActivePassive);
target
.connect(1000)
.expect("test: operation should succeed");
target.update_state(0, 500, 2000);
assert_eq!(target.state, ReplicationState::Synchronized);
target.update_state(1000000, 15000, 17000);
assert_eq!(target.state, ReplicationState::Lagging);
}
#[test]
fn test_synchronized_targets() {
let mut mgr = ReplicationManager::new();
let mut t1 =
ReplicationTarget::new(1, "10.0.0.1:7777".into(), ReplicationMode::ActivePassive);
let mut t2 =
ReplicationTarget::new(2, "10.0.0.2:7777".into(), ReplicationMode::ActivePassive);
t1.connect(1000).expect("test: operation should succeed");
t2.connect(1000).expect("test: operation should succeed");
t1.update_state(0, 500, 1500);
t2.update_state(1000, 5000, 6000);
mgr.add_target(t1);
mgr.add_target(t2);
let synced = mgr.synchronized_targets();
assert_eq!(synced.len(), 1);
assert_eq!(synced[0], 1);
}
#[test]
fn test_version_merge() {
let mut mgr = ReplicationManager::new();
mgr.set_local_id(1);
mgr.version.increment(1);
mgr.version.increment(1);
let mut remote_version = VersionVector::new();
remote_version.increment(1);
remote_version.increment(2);
remote_version.increment(2);
remote_version.increment(2);
mgr.handle_remote_write(2, &remote_version, 1000).ok();
assert_eq!(mgr.version.get(1), 2);
assert_eq!(mgr.version.get(2), 3);
}
}