use crate::error::{ClusterError, Result};
use serde::{Deserialize, Serialize};
use std::collections::{BTreeMap, HashMap, HashSet, VecDeque};
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::Arc;
use std::time::{Duration, SystemTime};
use tokio::sync::{Mutex, RwLock};
use tracing::{debug, info};
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct StreamTriple {
pub subject: String,
pub predicate: String,
pub object: String,
pub graph: Option<String>,
}
impl StreamTriple {
pub fn new(
subject: impl Into<String>,
predicate: impl Into<String>,
object: impl Into<String>,
) -> Self {
Self {
subject: subject.into(),
predicate: predicate.into(),
object: object.into(),
graph: None,
}
}
pub fn in_graph(
subject: impl Into<String>,
predicate: impl Into<String>,
object: impl Into<String>,
graph: impl Into<String>,
) -> Self {
Self {
subject: subject.into(),
predicate: predicate.into(),
object: object.into(),
graph: Some(graph.into()),
}
}
pub fn validate(&self) -> Result<()> {
if self.subject.is_empty() {
return Err(ClusterError::Config(
"Triple subject cannot be empty".into(),
));
}
if self.predicate.is_empty() {
return Err(ClusterError::Config(
"Triple predicate cannot be empty".into(),
));
}
if self.object.is_empty() {
return Err(ClusterError::Config("Triple object cannot be empty".into()));
}
Ok(())
}
}
impl std::fmt::Display for StreamTriple {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"<{}> <{}> {} .",
self.subject, self.predicate, self.object
)
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum MutationOp {
Insert,
Delete,
Truncate {
graph: Option<String>,
},
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StreamMessage {
pub stream_id: String,
pub offset: u64,
pub op: MutationOp,
pub triples: Vec<StreamTriple>,
pub timestamp_ms: u64,
}
impl StreamMessage {
pub fn insert(stream_id: impl Into<String>, offset: u64, triples: Vec<StreamTriple>) -> Self {
Self {
stream_id: stream_id.into(),
offset,
op: MutationOp::Insert,
triples,
timestamp_ms: SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap_or(Duration::ZERO)
.as_millis() as u64,
}
}
pub fn delete(stream_id: impl Into<String>, offset: u64, triples: Vec<StreamTriple>) -> Self {
Self {
stream_id: stream_id.into(),
offset,
op: MutationOp::Delete,
triples,
timestamp_ms: SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap_or(Duration::ZERO)
.as_millis() as u64,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ConsumerConfig {
pub max_buffer_size: usize,
pub flush_interval_ms: u64,
pub batch_size: usize,
pub validate_triples: bool,
}
impl Default for ConsumerConfig {
fn default() -> Self {
Self {
max_buffer_size: 10_000,
flush_interval_ms: 100,
batch_size: 500,
validate_triples: true,
}
}
}
#[derive(Debug, Default)]
pub struct ConsumerStats {
pub messages_consumed: AtomicU64,
pub triples_inserted: AtomicU64,
pub triples_deleted: AtomicU64,
pub validation_errors: AtomicU64,
pub bytes_processed: AtomicU64,
}
pub struct StreamingTripleConsumer {
config: ConsumerConfig,
buffer: Arc<Mutex<VecDeque<StreamMessage>>>,
stats: Arc<ConsumerStats>,
running: Arc<AtomicBool>,
}
impl StreamingTripleConsumer {
pub fn new(config: ConsumerConfig) -> Result<Self> {
if config.max_buffer_size == 0 {
return Err(ClusterError::Config("max_buffer_size must be > 0".into()));
}
if config.batch_size == 0 {
return Err(ClusterError::Config("batch_size must be > 0".into()));
}
Ok(Self {
config,
buffer: Arc::new(Mutex::new(VecDeque::new())),
stats: Arc::new(ConsumerStats::default()),
running: Arc::new(AtomicBool::new(false)),
})
}
pub async fn submit(&self, msg: StreamMessage) -> Result<()> {
if self.config.validate_triples {
for triple in &msg.triples {
triple.validate().map_err(|e| {
self.stats.validation_errors.fetch_add(1, Ordering::Relaxed);
e
})?;
}
}
let mut buf = self.buffer.lock().await;
if buf.len() >= self.config.max_buffer_size {
return Err(ClusterError::ResourceLimit(
"streaming consumer buffer is full".into(),
));
}
let triple_count = msg.triples.len() as u64;
self.stats.messages_consumed.fetch_add(1, Ordering::Relaxed);
match &msg.op {
MutationOp::Insert => {
self.stats
.triples_inserted
.fetch_add(triple_count, Ordering::Relaxed);
}
MutationOp::Delete => {
self.stats
.triples_deleted
.fetch_add(triple_count, Ordering::Relaxed);
}
MutationOp::Truncate { .. } => {}
}
buf.push_back(msg);
Ok(())
}
pub async fn drain_batch(&self) -> Vec<StreamMessage> {
let mut buf = self.buffer.lock().await;
let n = self.config.batch_size.min(buf.len());
buf.drain(..n).collect()
}
pub async fn buffer_len(&self) -> usize {
self.buffer.lock().await.len()
}
pub fn stats(&self) -> &ConsumerStats {
&self.stats
}
pub fn start(&self) {
self.running.store(true, Ordering::SeqCst);
info!("StreamingTripleConsumer started");
}
pub fn stop(&self) {
self.running.store(false, Ordering::SeqCst);
info!("StreamingTripleConsumer stopped");
}
pub fn is_running(&self) -> bool {
self.running.load(Ordering::SeqCst)
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MutationEntry {
pub seq: u64,
pub message: StreamMessage,
pub assigned_by: u64,
}
pub struct StreamingMutationLog {
log: Arc<RwLock<BTreeMap<u64, MutationEntry>>>,
next_seq: AtomicU64,
committed_seq: AtomicU64,
max_capacity: usize,
node_id: u64,
}
impl StreamingMutationLog {
pub fn new(node_id: u64, max_capacity: usize) -> Result<Self> {
if max_capacity == 0 {
return Err(ClusterError::Config("max_capacity must be > 0".into()));
}
Ok(Self {
log: Arc::new(RwLock::new(BTreeMap::new())),
next_seq: AtomicU64::new(1),
committed_seq: AtomicU64::new(0),
max_capacity,
node_id,
})
}
pub async fn append(&self, message: StreamMessage) -> Result<u64> {
let seq = self.next_seq.fetch_add(1, Ordering::SeqCst);
let entry = MutationEntry {
seq,
message,
assigned_by: self.node_id,
};
let mut log = self.log.write().await;
while log.len() >= self.max_capacity {
if let Some((&oldest_seq, _)) = log.iter().next() {
log.remove(&oldest_seq);
debug!(oldest_seq, "Evicted oldest mutation log entry");
} else {
break;
}
}
log.insert(seq, entry);
Ok(seq)
}
pub fn commit_up_to(&self, seq: u64) {
let current = self.committed_seq.load(Ordering::SeqCst);
if seq > current {
self.committed_seq.store(seq, Ordering::SeqCst);
}
}
pub async fn read_range(&self, from: u64, to: u64) -> Vec<MutationEntry> {
let log = self.log.read().await;
log.range(from..=to).map(|(_, e)| e.clone()).collect()
}
pub fn committed_seq(&self) -> u64 {
self.committed_seq.load(Ordering::SeqCst)
}
pub fn next_seq(&self) -> u64 {
self.next_seq.load(Ordering::SeqCst)
}
pub async fn len(&self) -> usize {
self.log.read().await.len()
}
pub async fn is_empty(&self) -> bool {
self.log.read().await.is_empty()
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct NodeOffset {
pub node_id: u64,
pub offset: u64,
pub updated_at_ms: u64,
}
pub struct StreamSyncCoordinator {
offsets: Arc<RwLock<HashMap<(String, u64), NodeOffset>>>,
streams: Arc<RwLock<HashSet<String>>>,
}
impl StreamSyncCoordinator {
pub fn new() -> Self {
Self {
offsets: Arc::new(RwLock::new(HashMap::new())),
streams: Arc::new(RwLock::new(HashSet::new())),
}
}
pub async fn report_offset(&self, stream_id: impl Into<String>, node_id: u64, offset: u64) {
let stream_id = stream_id.into();
let now_ms = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap_or(Duration::ZERO)
.as_millis() as u64;
{
let mut streams = self.streams.write().await;
streams.insert(stream_id.clone());
}
let mut offsets = self.offsets.write().await;
offsets.insert(
(stream_id, node_id),
NodeOffset {
node_id,
offset,
updated_at_ms: now_ms,
},
);
}
pub async fn min_offset(&self, stream_id: &str) -> Option<u64> {
let offsets = self.offsets.read().await;
let relevant: Vec<u64> = offsets
.iter()
.filter(|((s, _), _)| s == stream_id)
.map(|(_, no)| no.offset)
.collect();
if relevant.is_empty() {
None
} else {
relevant.into_iter().min()
}
}
pub async fn max_offset(&self, stream_id: &str) -> Option<u64> {
let offsets = self.offsets.read().await;
let relevant: Vec<u64> = offsets
.iter()
.filter(|((s, _), _)| s == stream_id)
.map(|(_, no)| no.offset)
.collect();
if relevant.is_empty() {
None
} else {
relevant.into_iter().max()
}
}
pub async fn lag(&self, stream_id: &str) -> Option<u64> {
let min = self.min_offset(stream_id).await?;
let max = self.max_offset(stream_id).await?;
Some(max.saturating_sub(min))
}
pub async fn streams(&self) -> Vec<String> {
self.streams.read().await.iter().cloned().collect()
}
pub async fn node_offsets_for(&self, stream_id: &str) -> Vec<NodeOffset> {
let offsets = self.offsets.read().await;
offsets
.iter()
.filter(|((s, _), _)| s == stream_id)
.map(|(_, no)| no.clone())
.collect()
}
}
impl Default for StreamSyncCoordinator {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Checkpoint {
pub stream_id: String,
pub node_id: u64,
pub offset: u64,
pub checkpoint_ms: u64,
pub version: u64,
}
pub struct StreamingCheckpointer {
checkpoints: Arc<RwLock<HashMap<(String, u64), Checkpoint>>>,
version_counter: AtomicU64,
}
impl StreamingCheckpointer {
pub fn new() -> Self {
Self {
checkpoints: Arc::new(RwLock::new(HashMap::new())),
version_counter: AtomicU64::new(1),
}
}
pub async fn save(
&self,
stream_id: impl Into<String>,
node_id: u64,
offset: u64,
) -> Result<Checkpoint> {
let stream_id = stream_id.into();
let version = self.version_counter.fetch_add(1, Ordering::SeqCst);
let now_ms = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap_or(Duration::ZERO)
.as_millis() as u64;
let cp = Checkpoint {
stream_id: stream_id.clone(),
node_id,
offset,
checkpoint_ms: now_ms,
version,
};
let mut cps = self.checkpoints.write().await;
let key = (stream_id.clone(), node_id);
if let Some(existing) = cps.get(&key) {
if existing.version > version {
return Err(ClusterError::Config(format!(
"Checkpoint version conflict: existing {} > new {}",
existing.version, version
)));
}
}
cps.insert(key, cp.clone());
debug!(stream_id = %stream_id, node_id, offset, version, "Checkpoint saved");
Ok(cp)
}
pub async fn load(&self, stream_id: &str, node_id: u64) -> Option<Checkpoint> {
let cps = self.checkpoints.read().await;
cps.get(&(stream_id.to_string(), node_id)).cloned()
}
pub async fn checkpoints_for_node(&self, node_id: u64) -> Vec<Checkpoint> {
let cps = self.checkpoints.read().await;
cps.values()
.filter(|c| c.node_id == node_id)
.cloned()
.collect()
}
pub async fn checkpoints_for_stream(&self, stream_id: &str) -> Vec<Checkpoint> {
let cps = self.checkpoints.read().await;
cps.values()
.filter(|c| c.stream_id == stream_id)
.cloned()
.collect()
}
pub async fn restore(&self, checkpoints: Vec<Checkpoint>) {
let mut cps = self.checkpoints.write().await;
for cp in checkpoints {
let key = (cp.stream_id.clone(), cp.node_id);
cps.insert(key, cp);
}
info!("Checkpoint store restored");
}
pub async fn count(&self) -> usize {
self.checkpoints.read().await.len()
}
}
impl Default for StreamingCheckpointer {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
fn sample_triple() -> StreamTriple {
StreamTriple::new(
"http://example.org/subject",
"http://example.org/predicate",
"\"hello\"",
)
}
fn insert_msg(offset: u64) -> StreamMessage {
StreamMessage::insert("stream-rdf", offset, vec![sample_triple()])
}
#[tokio::test]
async fn test_consumer_submit_and_drain() {
let consumer = StreamingTripleConsumer::new(ConsumerConfig::default()).expect("new");
consumer.start();
consumer.submit(insert_msg(1)).await.expect("submit 1");
consumer.submit(insert_msg(2)).await.expect("submit 2");
assert_eq!(consumer.buffer_len().await, 2);
let batch = consumer.drain_batch().await;
assert_eq!(batch.len(), 2);
assert_eq!(consumer.buffer_len().await, 0);
}
#[tokio::test]
async fn test_consumer_buffer_overflow() {
let cfg = ConsumerConfig {
max_buffer_size: 2,
batch_size: 10,
..Default::default()
};
let consumer = StreamingTripleConsumer::new(cfg).expect("new");
consumer.submit(insert_msg(1)).await.expect("1");
consumer.submit(insert_msg(2)).await.expect("2");
assert!(consumer.submit(insert_msg(3)).await.is_err());
}
#[tokio::test]
async fn test_consumer_stats_tracking() {
let consumer = StreamingTripleConsumer::new(ConsumerConfig::default()).expect("new");
consumer.submit(insert_msg(1)).await.expect("submit");
assert_eq!(
consumer.stats().messages_consumed.load(Ordering::Relaxed),
1
);
assert_eq!(consumer.stats().triples_inserted.load(Ordering::Relaxed), 1);
}
#[tokio::test]
async fn test_consumer_validation_rejects_empty_triple() {
let consumer = StreamingTripleConsumer::new(ConsumerConfig {
validate_triples: true,
..Default::default()
})
.expect("new");
let bad_triple = StreamTriple::new("", "http://pred", "obj");
let msg = StreamMessage::insert("s", 1, vec![bad_triple]);
assert!(consumer.submit(msg).await.is_err());
}
#[tokio::test]
async fn test_mutation_log_append_and_read() {
let log = StreamingMutationLog::new(1, 1000).expect("new");
let seq1 = log.append(insert_msg(1)).await.expect("append 1");
let seq2 = log.append(insert_msg(2)).await.expect("append 2");
assert_eq!(seq2, seq1 + 1);
let entries = log.read_range(seq1, seq2).await;
assert_eq!(entries.len(), 2);
}
#[tokio::test]
async fn test_mutation_log_commit() {
let log = StreamingMutationLog::new(1, 1000).expect("new");
let seq = log.append(insert_msg(10)).await.expect("append");
assert_eq!(log.committed_seq(), 0);
log.commit_up_to(seq);
assert_eq!(log.committed_seq(), seq);
}
#[tokio::test]
async fn test_mutation_log_capacity_eviction() {
let log = StreamingMutationLog::new(1, 3).expect("new");
for i in 0..5 {
log.append(insert_msg(i)).await.expect("append");
}
assert!(log.len().await <= 3);
}
#[tokio::test]
async fn test_sync_coordinator_offsets() {
let coord = StreamSyncCoordinator::new();
coord.report_offset("rdf-stream", 1, 100).await;
coord.report_offset("rdf-stream", 2, 200).await;
coord.report_offset("rdf-stream", 3, 150).await;
assert_eq!(coord.min_offset("rdf-stream").await, Some(100));
assert_eq!(coord.max_offset("rdf-stream").await, Some(200));
assert_eq!(coord.lag("rdf-stream").await, Some(100));
}
#[tokio::test]
async fn test_sync_coordinator_unknown_stream() {
let coord = StreamSyncCoordinator::new();
assert!(coord.min_offset("nonexistent").await.is_none());
assert!(coord.lag("nonexistent").await.is_none());
}
#[tokio::test]
async fn test_checkpointer_save_load() {
let cp = StreamingCheckpointer::new();
let saved = cp.save("my-stream", 42, 999).await.expect("save");
assert_eq!(saved.offset, 999);
assert_eq!(saved.node_id, 42);
let loaded = cp.load("my-stream", 42).await.expect("load");
assert_eq!(loaded.offset, 999);
}
#[tokio::test]
async fn test_checkpointer_restore() {
let cp = StreamingCheckpointer::new();
let checkpoints = vec![
Checkpoint {
stream_id: "s1".into(),
node_id: 1,
offset: 50,
checkpoint_ms: 0,
version: 1,
},
Checkpoint {
stream_id: "s2".into(),
node_id: 1,
offset: 75,
checkpoint_ms: 0,
version: 2,
},
];
cp.restore(checkpoints).await;
assert_eq!(cp.count().await, 2);
let for_node = cp.checkpoints_for_node(1).await;
assert_eq!(for_node.len(), 2);
}
}