use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use thiserror::Error;
use tracing::{debug, warn};
use crate::consensus::ConsensusManager;
use crate::stream_integration::StreamMessage;
use crate::streaming::backpressure_bridge::{BackpressureBridge, BackpressureSignal};
use crate::streaming::event_mapper::{EventMapper, MapperError};
#[derive(Debug, Error)]
pub enum SinkError {
#[error("local node is not the leader; cannot accept stream batch")]
NotLeader,
#[error("cluster backpressure is Stop; refusing batch")]
BackpressureStopped,
#[error(transparent)]
Mapping(#[from] MapperError),
#[error("consensus error: {0}")]
Consensus(String),
}
pub type SinkResult<T> = std::result::Result<T, SinkError>;
#[async_trait]
pub trait StreamSink: Send + Sync {
async fn write_batch(&self, events: Vec<StreamMessage>) -> SinkResult<()>;
fn backpressure_signal(&self) -> BackpressureSignal {
BackpressureSignal::Continue
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ClusterSinkConfig {
pub honor_backpressure_stop: bool,
pub max_batch_commands: usize,
pub require_leader: bool,
}
impl Default for ClusterSinkConfig {
fn default() -> Self {
Self {
honor_backpressure_stop: true,
max_batch_commands: 4_096,
require_leader: true,
}
}
}
#[derive(Debug, Default)]
pub struct ClusterSinkStats {
pub batches_received: AtomicU64,
pub batches_committed: AtomicU64,
pub batches_rejected_not_leader: AtomicU64,
pub batches_rejected_backpressure: AtomicU64,
pub commands_committed: AtomicU64,
pub commands_failed: AtomicU64,
}
pub struct ClusterSink {
consensus: Arc<ConsensusManager>,
mapper: Arc<dyn EventMapper>,
bridge: BackpressureBridge,
config: ClusterSinkConfig,
stats: Arc<ClusterSinkStats>,
}
impl ClusterSink {
pub fn new(
consensus: Arc<ConsensusManager>,
mapper: Arc<dyn EventMapper>,
config: ClusterSinkConfig,
) -> Self {
Self {
consensus,
mapper,
bridge: BackpressureBridge::default(),
config,
stats: Arc::new(ClusterSinkStats::default()),
}
}
pub fn with_bridge(
consensus: Arc<ConsensusManager>,
mapper: Arc<dyn EventMapper>,
bridge: BackpressureBridge,
config: ClusterSinkConfig,
) -> Self {
Self {
consensus,
mapper,
bridge,
config,
stats: Arc::new(ClusterSinkStats::default()),
}
}
pub fn bridge(&self) -> BackpressureBridge {
self.bridge.clone()
}
pub fn stats(&self) -> &Arc<ClusterSinkStats> {
&self.stats
}
pub fn config(&self) -> &ClusterSinkConfig {
&self.config
}
pub fn consensus(&self) -> &Arc<ConsensusManager> {
&self.consensus
}
}
#[async_trait]
impl StreamSink for ClusterSink {
async fn write_batch(&self, events: Vec<StreamMessage>) -> SinkResult<()> {
self.stats.batches_received.fetch_add(1, Ordering::Relaxed);
if self.config.honor_backpressure_stop
&& matches!(self.bridge.signal(), BackpressureSignal::Stop)
{
self.stats
.batches_rejected_backpressure
.fetch_add(1, Ordering::Relaxed);
return Err(SinkError::BackpressureStopped);
}
if self.config.require_leader && !self.consensus.is_leader().await {
self.stats
.batches_rejected_not_leader
.fetch_add(1, Ordering::Relaxed);
return Err(SinkError::NotLeader);
}
let mut commands = self.mapper.map_batch(&events)?;
if self.config.max_batch_commands > 0 && commands.len() > self.config.max_batch_commands {
warn!(
command_count = commands.len(),
cap = self.config.max_batch_commands,
"ClusterSink batch exceeds max_batch_commands; truncating"
);
commands.truncate(self.config.max_batch_commands);
}
let cmd_count = commands.len() as u64;
debug!(
event_count = events.len(),
command_count = cmd_count,
"ClusterSink proposing batch"
);
let _ = self.bridge.add(cmd_count);
let mut errors = 0u64;
for cmd in commands.into_iter() {
match self.consensus.propose_command(cmd).await {
Ok(resp) => {
if let crate::raft::RdfResponse::Error(msg) = resp {
warn!(?msg, "ClusterSink: command apply returned error");
errors += 1;
} else {
self.stats
.commands_committed
.fetch_add(1, Ordering::Relaxed);
}
}
Err(e) => {
let _ = self.bridge.sub(1);
self.stats.commands_failed.fetch_add(1, Ordering::Relaxed);
return Err(SinkError::Consensus(e.to_string()));
}
}
}
let _ = self.bridge.sub(cmd_count);
self.stats
.commands_failed
.fetch_add(errors, Ordering::Relaxed);
if errors == 0 {
self.stats.batches_committed.fetch_add(1, Ordering::Relaxed);
}
Ok(())
}
fn backpressure_signal(&self) -> BackpressureSignal {
self.bridge.signal()
}
}
pub fn should_pause(signal: BackpressureSignal) -> bool {
!matches!(signal, BackpressureSignal::Continue)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::raft::{init_global_shared_storage, reset_global_shared_storage};
use crate::stream_integration::{StreamMessage, StreamTriple};
use crate::streaming::event_mapper::DefaultEventMapper;
static TEST_LOCK: tokio::sync::Mutex<()> = tokio::sync::Mutex::const_new(());
fn make_sink(node_id: u64) -> ClusterSink {
let consensus = Arc::new(ConsensusManager::new(node_id, vec![]));
let mapper = Arc::new(DefaultEventMapper::default());
ClusterSink::new(consensus, mapper, ClusterSinkConfig::default())
}
fn triple(i: usize) -> StreamTriple {
StreamTriple::new(
format!("http://example.org/s/{i}"),
"http://example.org/p/has",
format!("\"value-{i}\""),
)
}
#[tokio::test]
async fn write_batch_proposes_through_consensus() {
let _g = TEST_LOCK.lock().await;
init_global_shared_storage();
reset_global_shared_storage().await;
let sink = make_sink(1);
let msg = StreamMessage::insert("rdf-stream", 1, vec![triple(0), triple(1)]);
sink.write_batch(vec![msg]).await.expect("commit");
assert_eq!(sink.stats().batches_committed.load(Ordering::Relaxed), 1);
assert_eq!(sink.stats().commands_committed.load(Ordering::Relaxed), 2);
let len = sink.consensus().len().await;
assert_eq!(len, 2);
}
#[tokio::test]
async fn write_batch_respects_backpressure_stop() {
let _g = TEST_LOCK.lock().await;
init_global_shared_storage();
reset_global_shared_storage().await;
let sink = make_sink(2);
let _ = sink.bridge().observe(10_000_000);
assert_eq!(sink.bridge().signal(), BackpressureSignal::Stop);
let msg = StreamMessage::insert("rdf-stream", 1, vec![triple(0)]);
let err = sink
.write_batch(vec![msg])
.await
.expect_err("should refuse");
assert!(matches!(err, SinkError::BackpressureStopped));
assert_eq!(
sink.stats()
.batches_rejected_backpressure
.load(Ordering::Relaxed),
1
);
}
#[tokio::test]
async fn write_batch_truncates_oversized() {
let _g = TEST_LOCK.lock().await;
init_global_shared_storage();
reset_global_shared_storage().await;
let consensus = Arc::new(ConsensusManager::new(3, vec![]));
let mapper = Arc::new(DefaultEventMapper::default());
let cfg = ClusterSinkConfig {
max_batch_commands: 2,
..Default::default()
};
let sink = ClusterSink::new(consensus, mapper, cfg);
let triples: Vec<_> = (0..5).map(triple).collect();
let msg = StreamMessage::insert("rdf-stream", 1, triples);
sink.write_batch(vec![msg]).await.expect("commit");
assert_eq!(sink.stats().commands_committed.load(Ordering::Relaxed), 2);
}
#[tokio::test]
async fn write_batch_handles_empty_input() {
let _g = TEST_LOCK.lock().await;
init_global_shared_storage();
reset_global_shared_storage().await;
let sink = make_sink(4);
sink.write_batch(vec![]).await.expect("ok");
assert_eq!(sink.stats().batches_committed.load(Ordering::Relaxed), 1);
}
#[tokio::test]
async fn backpressure_signal_default_is_continue() {
let _g = TEST_LOCK.lock().await;
init_global_shared_storage();
reset_global_shared_storage().await;
let sink = make_sink(5);
assert_eq!(sink.backpressure_signal(), BackpressureSignal::Continue);
}
#[test]
fn should_pause_flags_non_continue() {
assert!(!should_pause(BackpressureSignal::Continue));
assert!(should_pause(BackpressureSignal::Slow));
assert!(should_pause(BackpressureSignal::Stop));
}
#[tokio::test]
async fn require_leader_false_proposes_anyway() {
let _g = TEST_LOCK.lock().await;
init_global_shared_storage();
reset_global_shared_storage().await;
let consensus = Arc::new(ConsensusManager::new(6, vec![]));
let mapper = Arc::new(DefaultEventMapper::default());
let cfg = ClusterSinkConfig {
require_leader: false,
..Default::default()
};
let sink = ClusterSink::new(consensus, mapper, cfg);
let msg = StreamMessage::insert("rdf-stream", 1, vec![triple(7)]);
sink.write_batch(vec![msg]).await.expect("ok");
assert_eq!(sink.stats().commands_committed.load(Ordering::Relaxed), 1);
}
}