#![allow(dead_code)]
use crate::model::{Triple, TriplePattern};
use crate::OxirsError;
use serde::{Deserialize, Serialize};
use std::collections::{HashMap, VecDeque};
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::{mpsc, Mutex, RwLock};
use tokio::time::interval;
#[derive(Debug, Clone)]
pub struct ReplicationConfig {
pub region_id: String,
pub region: RegionConfig,
pub peers: Vec<RegionPeer>,
pub strategy: ReplicationStrategy,
pub conflict_resolution: ConflictResolution,
pub network: NetworkConfig,
pub persistence: PersistenceConfig,
}
#[derive(Debug, Clone)]
pub struct RegionConfig {
pub name: String,
pub location: GeographicLocation,
pub availability_zones: Vec<String>,
pub capacity: RegionCapacity,
}
#[derive(Debug, Clone)]
pub struct GeographicLocation {
pub latitude: f64,
pub longitude: f64,
pub continent: String,
pub country: String,
}
#[derive(Debug, Clone)]
pub struct RegionCapacity {
pub read_units: u32,
pub write_units: u32,
pub storage_gb: u32,
pub auto_scaling: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RegionPeer {
pub region_id: String,
pub endpoints: Vec<SocketAddr>,
pub priority: u32,
pub active: bool,
}
#[derive(Debug, Clone)]
pub enum ReplicationStrategy {
SyncAll,
SyncQuorum { n: usize },
AsyncAll,
Chain { order: Vec<String> },
Hierarchical { topology: ReplicationTopology },
Adaptive,
}
#[derive(Debug, Clone)]
pub struct ReplicationTopology {
pub primary: Vec<String>,
pub secondary: Vec<String>,
pub edge: Vec<String>,
}
#[derive(Debug, Clone)]
pub enum ConflictResolution {
LastWriteWins,
VectorClock,
Custom(String),
MultiValue,
RegionPriority,
}
#[derive(Debug, Clone)]
pub struct NetworkConfig {
pub connect_timeout: Duration,
pub request_timeout: Duration,
pub max_retries: u32,
pub compression: bool,
pub encryption: EncryptionConfig,
}
#[derive(Debug, Clone)]
pub struct EncryptionConfig {
pub tls_enabled: bool,
pub cert_path: Option<String>,
pub key_path: Option<String>,
pub ca_path: Option<String>,
}
#[derive(Debug, Clone)]
pub struct PersistenceConfig {
pub wal_path: String,
pub checkpoint_interval: Duration,
pub max_wal_size: usize,
pub wal_compression: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum ReplicationOp {
Insert(VersionedTriple),
Delete(VersionedTriple),
Batch(Vec<ReplicationOp>),
SnapshotChunk(SnapshotChunk),
Heartbeat(HeartbeatInfo),
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct VersionedTriple {
pub triple: Triple,
pub version: VectorClock,
pub timestamp: u64,
pub origin_region: String,
pub tx_id: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct VectorClock {
pub entries: HashMap<String, u64>,
}
impl Default for VectorClock {
fn default() -> Self {
Self::new()
}
}
impl VectorClock {
pub fn new() -> Self {
VectorClock {
entries: HashMap::new(),
}
}
pub fn increment(&mut self, region: &str) {
let counter = self.entries.entry(region.to_string()).or_insert(0);
*counter += 1;
}
pub fn merge(&mut self, other: &VectorClock) {
for (region, &count) in &other.entries {
let entry = self.entries.entry(region.clone()).or_insert(0);
*entry = (*entry).max(count);
}
}
pub fn is_concurrent(&self, other: &VectorClock) -> bool {
!self.happens_before(other) && !other.happens_before(self)
}
pub fn happens_before(&self, other: &VectorClock) -> bool {
let mut all_leq = true;
let mut exists_lt = false;
for (region, &count) in &self.entries {
let other_count = other.entries.get(region).copied().unwrap_or(0);
if count > other_count {
all_leq = false;
break;
}
if count < other_count {
exists_lt = true;
}
}
for region in other.entries.keys() {
if !self.entries.contains_key(region) && other.entries[region] > 0 {
exists_lt = true;
}
}
all_leq && exists_lt
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SnapshotChunk {
pub snapshot_id: String,
pub chunk_index: u64,
pub total_chunks: u64,
pub data: Vec<u8>,
pub checksum: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct HeartbeatInfo {
pub region_id: String,
pub timestamp: u64,
pub load: LoadMetrics,
pub lag_ms: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LoadMetrics {
pub cpu_percent: f32,
pub memory_percent: f32,
pub disk_percent: f32,
pub network_mbps: f32,
pub connections: u32,
}
pub struct ReplicationManager {
config: ReplicationConfig,
storage: Arc<RwLock<ReplicationStorage>>,
state: Arc<RwLock<ReplicationState>>,
network: Arc<NetworkManager>,
resolver: Arc<ConflictResolver>,
wal: Arc<RwLock<WriteAheadLog>>,
stats: Arc<RwLock<ReplicationStats>>,
}
#[allow(dead_code)]
struct ReplicationStorage {
triples: HashMap<Triple, VersionedTriple>,
conflicts: HashMap<Triple, Vec<VersionedTriple>>,
#[allow(dead_code)]
pending_ops: VecDeque<ReplicationOp>,
}
#[allow(dead_code)]
struct ReplicationState {
vector_clock: VectorClock,
peer_states: HashMap<String, PeerState>,
active_snapshots: HashMap<String, SnapshotTransfer>,
status: ReplicationStatus,
}
#[derive(Debug, Clone)]
#[allow(dead_code)]
struct PeerState {
last_seen: Instant,
last_clock: VectorClock,
connected: bool,
lag_ms: u64,
in_flight: u64,
}
#[allow(dead_code)]
struct SnapshotTransfer {
id: String,
direction: TransferDirection,
chunks_transferred: u64,
total_chunks: u64,
start_time: Instant,
}
#[derive(Debug)]
enum TransferDirection {
Send,
Receive,
}
#[derive(Debug, Clone)]
enum ReplicationStatus {
Healthy,
Degraded,
PartialOutage,
FullOutage,
}
struct NetworkManager {
connections: Arc<RwLock<HashMap<String, PeerConnection>>>,
message_tx: mpsc::Sender<NetworkMessage>,
message_rx: Arc<Mutex<mpsc::Receiver<NetworkMessage>>>,
}
struct PeerConnection {
region_id: String,
handle: Arc<Mutex<Option<tokio::task::JoinHandle<()>>>>,
send_tx: mpsc::Sender<ReplicationOp>,
}
#[derive(Debug)]
enum NetworkMessage {
IncomingOp {
_from_region: String,
_op: Box<ReplicationOp>,
},
ConnectionEvent {
region_id: String,
event: ConnectionEvent,
},
}
#[derive(Debug)]
enum ConnectionEvent {
Connected,
Disconnected,
Error(String),
}
struct ConflictResolver {
strategy: ConflictResolution,
region_priorities: HashMap<String, u32>,
}
struct WriteAheadLog {
current_file: Option<std::fs::File>,
entries: VecDeque<WalEntry>,
current_size: usize,
config: PersistenceConfig,
}
#[derive(Debug, Serialize, Deserialize)]
struct WalEntry {
seq: u64,
timestamp: u64,
op: ReplicationOp,
checksum: u32,
}
#[derive(Debug, Default)]
struct ReplicationStats {
ops_sent: u64,
ops_received: u64,
conflicts_detected: u64,
conflicts_resolved: u64,
bytes_sent: u64,
bytes_received: u64,
avg_lag_ms: f64,
}
impl ReplicationManager {
pub async fn new(config: ReplicationConfig) -> Result<Self, OxirsError> {
let (message_tx, message_rx) = mpsc::channel(10000);
let storage = Arc::new(RwLock::new(ReplicationStorage {
triples: HashMap::new(),
conflicts: HashMap::new(),
pending_ops: VecDeque::new(),
}));
let state = Arc::new(RwLock::new(ReplicationState {
vector_clock: VectorClock::new(),
peer_states: HashMap::new(),
active_snapshots: HashMap::new(),
status: ReplicationStatus::Healthy,
}));
let network = Arc::new(NetworkManager {
connections: Arc::new(RwLock::new(HashMap::new())),
message_tx,
message_rx: Arc::new(Mutex::new(message_rx)),
});
let resolver = Arc::new(ConflictResolver {
strategy: config.conflict_resolution.clone(),
region_priorities: HashMap::new(),
});
let wal = Arc::new(RwLock::new(WriteAheadLog {
current_file: None,
entries: VecDeque::new(),
current_size: 0,
config: config.persistence.clone(),
}));
Ok(ReplicationManager {
config,
storage,
state,
network,
resolver,
wal,
stats: Arc::new(RwLock::new(ReplicationStats::default())),
})
}
pub async fn start(&self) -> Result<(), OxirsError> {
self.initialize_wal().await?;
self.connect_to_peers().await?;
self.spawn_message_processor();
self.spawn_heartbeat_sender();
self.spawn_lag_monitor();
self.spawn_wal_checkpoint();
Ok(())
}
pub async fn replicate_write(&self, triple: Triple, op_type: OpType) -> Result<(), OxirsError> {
let mut state = self.state.write().await;
state.vector_clock.increment(&self.config.region_id);
let versioned = VersionedTriple {
triple: triple.clone(),
version: state.vector_clock.clone(),
timestamp: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.expect("SystemTime should be after UNIX_EPOCH")
.as_secs(),
origin_region: self.config.region_id.clone(),
tx_id: Some(uuid::Uuid::new_v4().to_string()),
};
let op = match op_type {
OpType::Insert => ReplicationOp::Insert(versioned.clone()),
OpType::Delete => ReplicationOp::Delete(versioned.clone()),
};
self.write_to_wal(&op).await?;
let mut storage = self.storage.write().await;
match op_type {
OpType::Insert => {
storage.triples.insert(triple, versioned);
}
OpType::Delete => {
storage.triples.remove(&triple);
}
}
match &self.config.strategy {
ReplicationStrategy::SyncAll => {
self.replicate_sync_all(op).await?;
}
ReplicationStrategy::AsyncAll => {
self.replicate_async_all(op).await?;
}
ReplicationStrategy::SyncQuorum { n } => {
self.replicate_sync_quorum(op, *n).await?;
}
_ => {
self.replicate_async_all(op).await?;
}
}
Ok(())
}
pub async fn query(
&self,
pattern: &TriplePattern,
consistency: ConsistencyLevel,
) -> Result<Vec<Triple>, OxirsError> {
match consistency {
ConsistencyLevel::Strong => {
self.sync_with_quorum().await?;
}
ConsistencyLevel::BoundedStaleness { max_lag_ms } => {
let state = self.state.read().await;
for peer in state.peer_states.values() {
if peer.lag_ms > max_lag_ms {
return Err(OxirsError::Store(
"Data staleness exceeds bound".to_string(),
));
}
}
}
ConsistencyLevel::Eventual => {
}
}
let storage = self.storage.read().await;
let results: Vec<Triple> = storage
.triples
.values()
.filter(|vt| pattern.matches(&vt.triple))
.map(|vt| vt.triple.clone())
.collect();
Ok(results)
}
async fn handle_incoming_op(
&self,
from_region: String,
op: ReplicationOp,
) -> Result<(), OxirsError> {
let mut stats = self.stats.write().await;
stats.ops_received += 1;
drop(stats);
match &op {
ReplicationOp::Insert(versioned) | ReplicationOp::Delete(versioned) => {
let mut storage = self.storage.write().await;
let mut state = self.state.write().await;
if let Some(existing) = storage.triples.get(&versioned.triple) {
if existing.version.is_concurrent(&versioned.version) {
let mut stats = self.stats.write().await;
stats.conflicts_detected += 1;
drop(stats);
let winner = self.resolver.resolve_conflict(existing, versioned).await?;
storage.triples.insert(versioned.triple.clone(), winner);
storage
.conflicts
.entry(versioned.triple.clone())
.or_insert_with(Vec::new)
.push(versioned.clone());
} else if versioned.version.happens_before(&existing.version) {
return Ok(());
} else {
storage
.triples
.insert(versioned.triple.clone(), versioned.clone());
}
} else {
if matches!(op, ReplicationOp::Insert(_)) {
storage
.triples
.insert(versioned.triple.clone(), versioned.clone());
}
}
state.vector_clock.merge(&versioned.version);
}
ReplicationOp::Batch(ops) => {
for op in ops {
Box::pin(self.handle_incoming_op(from_region.clone(), op.clone())).await?;
}
}
ReplicationOp::Heartbeat(info) => {
self.handle_heartbeat(from_region, info.clone()).await?;
}
_ => {}
}
Ok(())
}
async fn initialize_wal(&self) -> Result<(), OxirsError> {
let mut wal = self.wal.write().await;
std::fs::create_dir_all(&wal.config.wal_path)?;
let wal_file = std::fs::OpenOptions::new()
.create(true)
.append(true)
.open(format!("{}/wal.log", wal.config.wal_path))?;
wal.current_file = Some(wal_file);
Ok(())
}
async fn connect_to_peers(&self) -> Result<(), OxirsError> {
for peer in &self.config.peers {
if peer.active {
tracing::info!("Connecting to peer region: {}", peer.region_id);
let mut state = self.state.write().await;
state.peer_states.insert(
peer.region_id.clone(),
PeerState {
last_seen: Instant::now(),
last_clock: VectorClock::new(),
connected: true,
lag_ms: 0,
in_flight: 0,
},
);
}
}
Ok(())
}
fn spawn_message_processor(&self) {
let storage = self.storage.clone();
let state = self.state.clone();
let network = self.network.clone();
let resolver = self.resolver.clone();
let stats = self.stats.clone();
tokio::spawn(async move {
let mut rx = network.message_rx.lock().await;
while let Some(msg) = rx.recv().await {
match msg {
NetworkMessage::IncomingOp { _from_region, _op } => {
let storage = storage.clone();
let state = state.clone();
let resolver = resolver.clone();
let stats = stats.clone();
tokio::spawn(async move {
if let Err(e) = Self::handle_incoming_op_static(
_from_region,
*_op,
storage,
state,
resolver,
stats,
)
.await
{
tracing::error!("Error handling incoming op: {}", e);
}
});
}
NetworkMessage::ConnectionEvent { region_id, event } => {
let mut state_guard = state.write().await;
if let Some(peer_state) = state_guard.peer_states.get_mut(®ion_id) {
match event {
ConnectionEvent::Connected => {
peer_state.connected = true;
peer_state.last_seen = Instant::now();
}
ConnectionEvent::Disconnected => {
peer_state.connected = false;
}
ConnectionEvent::Error(err) => {
tracing::error!("Connection error for {}: {}", region_id, err);
peer_state.connected = false;
}
}
}
}
}
}
});
}
async fn handle_incoming_op_static(
from_region: String,
op: ReplicationOp,
storage: Arc<RwLock<ReplicationStorage>>,
state: Arc<RwLock<ReplicationState>>,
resolver: Arc<ConflictResolver>,
stats: Arc<RwLock<ReplicationStats>>,
) -> Result<(), OxirsError> {
let mut stats_guard = stats.write().await;
stats_guard.ops_received += 1;
drop(stats_guard);
match &op {
ReplicationOp::Insert(versioned) | ReplicationOp::Delete(versioned) => {
let mut storage_guard = storage.write().await;
let mut state_guard = state.write().await;
if let Some(existing) = storage_guard.triples.get(&versioned.triple) {
if existing.version.is_concurrent(&versioned.version) {
let mut stats_guard = stats.write().await;
stats_guard.conflicts_detected += 1;
drop(stats_guard);
let winner = resolver.resolve_conflict(existing, versioned).await?;
storage_guard
.triples
.insert(versioned.triple.clone(), winner);
storage_guard
.conflicts
.entry(versioned.triple.clone())
.or_insert_with(Vec::new)
.push(versioned.clone());
} else if versioned.version.happens_before(&existing.version) {
return Ok(());
} else {
storage_guard
.triples
.insert(versioned.triple.clone(), versioned.clone());
}
} else {
if matches!(op, ReplicationOp::Insert(_)) {
storage_guard
.triples
.insert(versioned.triple.clone(), versioned.clone());
}
}
state_guard.vector_clock.merge(&versioned.version);
}
ReplicationOp::Batch(ops) => {
for op_item in ops {
Box::pin(Self::handle_incoming_op_static(
from_region.clone(),
op_item.clone(),
storage.clone(),
state.clone(),
resolver.clone(),
stats.clone(),
))
.await?;
}
}
ReplicationOp::Heartbeat(info) => {
let mut state_guard = state.write().await;
if let Some(peer) = state_guard.peer_states.get_mut(&from_region) {
peer.last_seen = std::time::Instant::now();
peer.lag_ms = info.lag_ms;
}
}
_ => {}
}
Ok(())
}
fn spawn_heartbeat_sender(&self) {
let config = self.config.clone();
let _state = self.state.clone();
let network = self.network.clone();
tokio::spawn(async move {
let mut interval = interval(Duration::from_secs(5));
loop {
interval.tick().await;
let heartbeat = ReplicationOp::Heartbeat(HeartbeatInfo {
region_id: config.region_id.clone(),
timestamp: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.expect("SystemTime should be after UNIX_EPOCH")
.as_secs(),
load: LoadMetrics {
cpu_percent: 0.0, memory_percent: 0.0,
disk_percent: 0.0,
network_mbps: 0.0,
connections: 0,
},
lag_ms: 0,
});
let connections = network.connections.read().await;
for (_, conn) in connections.iter() {
let _ = conn.send_tx.send(heartbeat.clone()).await;
}
}
});
}
fn spawn_lag_monitor(&self) {
let state = self.state.clone();
let stats = self.stats.clone();
tokio::spawn(async move {
let mut interval = interval(Duration::from_secs(10));
loop {
interval.tick().await;
let state_guard = state.read().await;
let mut total_lag = 0u64;
let mut peer_count = 0u64;
for peer_state in state_guard.peer_states.values() {
total_lag += peer_state.lag_ms;
peer_count += 1;
}
if peer_count > 0 {
let mut stats_guard = stats.write().await;
stats_guard.avg_lag_ms = total_lag as f64 / peer_count as f64;
}
}
});
}
fn spawn_wal_checkpoint(&self) {
let wal = self.wal.clone();
let config = self.config.persistence.clone();
tokio::spawn(async move {
let mut interval = interval(config.checkpoint_interval);
loop {
interval.tick().await;
let mut wal_guard = wal.write().await;
if wal_guard.current_size > config.max_wal_size {
tracing::info!("Rotating WAL file");
wal_guard.current_size = 0;
}
}
});
}
async fn write_to_wal(&self, op: &ReplicationOp) -> Result<(), OxirsError> {
let mut wal = self.wal.write().await;
let entry = WalEntry {
seq: wal.entries.len() as u64,
timestamp: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.expect("SystemTime should be after UNIX_EPOCH")
.as_secs(),
op: op.clone(),
checksum: 0, };
let serialized = oxicode::serde::encode_to_vec(&entry, oxicode::config::standard())?;
if let Some(ref mut file) = wal.current_file {
use std::io::Write;
file.write_all(&serialized)?;
file.sync_all()?;
}
wal.entries.push_back(entry);
wal.current_size += serialized.len();
Ok(())
}
async fn replicate_sync_all(&self, op: ReplicationOp) -> Result<(), OxirsError> {
let connections = self.network.connections.read().await;
let mut futures = Vec::new();
for (_region_id, conn) in connections.iter() {
let tx = conn.send_tx.clone();
let op_clone = op.clone();
let future = async move { tx.send(op_clone).await };
futures.push(future);
}
let results = futures::future::join_all(futures).await;
for result in results {
if result.is_err() {
return Err(OxirsError::Store("Replication failed".to_string()));
}
}
Ok(())
}
async fn replicate_async_all(&self, op: ReplicationOp) -> Result<(), OxirsError> {
let connections = self.network.connections.read().await;
for (_, conn) in connections.iter() {
let _ = conn.send_tx.try_send(op.clone());
}
Ok(())
}
async fn replicate_sync_quorum(&self, op: ReplicationOp, n: usize) -> Result<(), OxirsError> {
let connections = self.network.connections.read().await;
if connections.len() < n - 1 {
return Err(OxirsError::Store(format!(
"Not enough regions for quorum: need {}, have {}",
n,
connections.len() + 1
)));
}
let mut futures = Vec::new();
for (_, conn) in connections.iter().take(n - 1) {
let tx = conn.send_tx.clone();
let op_clone = op.clone();
let future = async move { tx.send(op_clone).await };
futures.push(future);
}
let results = futures::future::join_all(futures).await;
let successes = results.iter().filter(|r| r.is_ok()).count();
if successes + 1 >= n {
Ok(())
} else {
Err(OxirsError::Store("Quorum not achieved".to_string()))
}
}
async fn sync_with_quorum(&self) -> Result<(), OxirsError> {
Ok(())
}
async fn handle_heartbeat(
&self,
from_region: String,
info: HeartbeatInfo,
) -> Result<(), OxirsError> {
let mut state = self.state.write().await;
if let Some(peer) = state.peer_states.get_mut(&from_region) {
peer.last_seen = Instant::now();
peer.lag_ms = info.lag_ms;
}
Ok(())
}
}
impl ConflictResolver {
async fn resolve_conflict(
&self,
existing: &VersionedTriple,
incoming: &VersionedTriple,
) -> Result<VersionedTriple, OxirsError> {
match &self.strategy {
ConflictResolution::LastWriteWins => {
if incoming.timestamp > existing.timestamp {
Ok(incoming.clone())
} else {
Ok(existing.clone())
}
}
ConflictResolution::VectorClock => {
Ok(existing.clone())
}
ConflictResolution::RegionPriority => {
let existing_priority = self
.region_priorities
.get(&existing.origin_region)
.copied()
.unwrap_or(999);
let incoming_priority = self
.region_priorities
.get(&incoming.origin_region)
.copied()
.unwrap_or(999);
if incoming_priority < existing_priority {
Ok(incoming.clone())
} else {
Ok(existing.clone())
}
}
_ => Ok(existing.clone()),
}
}
}
#[derive(Debug, Clone)]
pub enum OpType {
Insert,
Delete,
}
#[derive(Debug, Clone)]
pub enum ConsistencyLevel {
Strong,
BoundedStaleness { max_lag_ms: u64 },
Eventual,
}
#[cfg(test)]
mod tests {
use super::*;
use crate::model::{Literal, NamedNode};
#[test]
fn test_vector_clock() {
let mut clock1 = VectorClock::new();
let mut clock2 = VectorClock::new();
clock1.increment("region1");
clock1.increment("region1");
clock2.increment("region2");
assert!(!clock1.happens_before(&clock2));
assert!(!clock2.happens_before(&clock1));
assert!(clock1.is_concurrent(&clock2));
clock2.merge(&clock1);
assert!(clock1.happens_before(&clock2));
}
#[tokio::test]
async fn test_replication_manager() {
let config = ReplicationConfig {
region_id: "us-east-1".to_string(),
region: RegionConfig {
name: "US East".to_string(),
location: GeographicLocation {
latitude: 38.7,
longitude: -77.0,
continent: "North America".to_string(),
country: "USA".to_string(),
},
availability_zones: vec!["us-east-1a".to_string(), "us-east-1b".to_string()],
capacity: RegionCapacity {
read_units: 1000,
write_units: 500,
storage_gb: 100,
auto_scaling: true,
},
},
peers: vec![],
strategy: ReplicationStrategy::AsyncAll,
conflict_resolution: ConflictResolution::LastWriteWins,
network: NetworkConfig {
connect_timeout: Duration::from_secs(5),
request_timeout: Duration::from_secs(30),
max_retries: 3,
compression: true,
encryption: EncryptionConfig {
tls_enabled: true,
cert_path: None,
key_path: None,
ca_path: None,
},
},
persistence: PersistenceConfig {
wal_path: "/tmp/wal".to_string(),
checkpoint_interval: Duration::from_secs(300),
max_wal_size: 100 * 1024 * 1024,
wal_compression: true,
},
};
let manager = ReplicationManager::new(config)
.await
.expect("async operation should succeed");
let triple = Triple::new(
NamedNode::new("http://example.org/s").expect("valid IRI"),
NamedNode::new("http://example.org/p").expect("valid IRI"),
crate::model::Object::Literal(Literal::new("test")),
);
manager
.replicate_write(triple.clone(), OpType::Insert)
.await
.expect("operation should succeed");
let pattern = TriplePattern::new(None, None, None);
let results = manager
.query(&pattern, ConsistencyLevel::Eventual)
.await
.expect("operation should succeed");
assert_eq!(results.len(), 1);
assert_eq!(results[0], triple);
}
}