use crossbeam_channel::{bounded, Receiver, Sender, TrySendError};
use std::collections::VecDeque;
use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
use std::sync::{Arc, Mutex, RwLock};
use std::thread;
use std::time::{Duration, Instant};
use super::StreamRecord;
use crate::error::{Error, Result};
use crate::{lock_safe, read_lock_safe, write_lock_safe};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum BackpressureStrategy {
Block,
DropOldest,
DropNewest,
AdaptiveSampling,
RateLimiting,
}
impl Default for BackpressureStrategy {
fn default() -> Self {
BackpressureStrategy::Block
}
}
#[derive(Debug, Clone)]
pub struct BackpressureConfig {
pub high_watermark: usize,
pub low_watermark: usize,
pub strategy: BackpressureStrategy,
pub block_timeout: Duration,
pub rate_limit: Option<f64>,
pub min_sampling_rate: f64,
}
impl Default for BackpressureConfig {
fn default() -> Self {
BackpressureConfig {
high_watermark: 10_000,
low_watermark: 5_000,
strategy: BackpressureStrategy::Block,
block_timeout: Duration::from_secs(30),
rate_limit: None,
min_sampling_rate: 0.1,
}
}
}
pub struct BackpressureConfigBuilder {
config: BackpressureConfig,
}
impl BackpressureConfigBuilder {
pub fn new() -> Self {
BackpressureConfigBuilder {
config: BackpressureConfig::default(),
}
}
pub fn high_watermark(mut self, watermark: usize) -> Self {
self.config.high_watermark = watermark;
self
}
pub fn low_watermark(mut self, watermark: usize) -> Self {
self.config.low_watermark = watermark;
self
}
pub fn strategy(mut self, strategy: BackpressureStrategy) -> Self {
self.config.strategy = strategy;
self
}
pub fn block_timeout(mut self, timeout: Duration) -> Self {
self.config.block_timeout = timeout;
self
}
pub fn rate_limit(mut self, rate: f64) -> Self {
self.config.rate_limit = Some(rate);
self
}
pub fn min_sampling_rate(mut self, rate: f64) -> Self {
self.config.min_sampling_rate = rate.clamp(0.0, 1.0);
self
}
pub fn build(self) -> BackpressureConfig {
self.config
}
}
impl Default for BackpressureConfigBuilder {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone)]
pub struct BackpressureStats {
pub records_received: u64,
pub records_dropped: u64,
pub records_processed: u64,
pub current_buffer_size: usize,
pub backpressure_events: u64,
pub current_sampling_rate: f64,
pub avg_latency_ms: f64,
}
impl Default for BackpressureStats {
fn default() -> Self {
BackpressureStats {
records_received: 0,
records_dropped: 0,
records_processed: 0,
current_buffer_size: 0,
backpressure_events: 0,
current_sampling_rate: 1.0,
avg_latency_ms: 0.0,
}
}
}
#[derive(Debug)]
pub struct BackpressureBuffer {
config: BackpressureConfig,
buffer: Arc<RwLock<VecDeque<StreamRecord>>>,
backpressure_active: Arc<AtomicBool>,
stats: Arc<RwLock<BackpressureStats>>,
current_sampling_rate: Arc<RwLock<f64>>,
rate_limiter: Arc<Mutex<RateLimiterState>>,
}
#[derive(Debug)]
struct RateLimiterState {
tokens: f64,
last_refill: Instant,
rate: f64,
}
impl BackpressureBuffer {
pub fn new(config: BackpressureConfig) -> Self {
let rate = config.rate_limit.unwrap_or(1000.0);
BackpressureBuffer {
config,
buffer: Arc::new(RwLock::new(VecDeque::new())),
backpressure_active: Arc::new(AtomicBool::new(false)),
stats: Arc::new(RwLock::new(BackpressureStats::default())),
current_sampling_rate: Arc::new(RwLock::new(1.0)),
rate_limiter: Arc::new(Mutex::new(RateLimiterState {
tokens: rate,
last_refill: Instant::now(),
rate,
})),
}
}
pub fn try_push(&self, record: StreamRecord) -> Result<bool> {
let mut stats = write_lock_safe!(self.stats, "backpressure stats write")?;
stats.records_received += 1;
let current_size = read_lock_safe!(self.buffer, "backpressure buffer read")?.len();
stats.current_buffer_size = current_size;
if current_size >= self.config.high_watermark {
self.backpressure_active.store(true, Ordering::SeqCst);
stats.backpressure_events += 1;
match self.config.strategy {
BackpressureStrategy::Block => {
return Ok(false);
}
BackpressureStrategy::DropOldest => {
let mut buffer = write_lock_safe!(self.buffer, "backpressure buffer write")?;
while buffer.len() >= self.config.high_watermark {
buffer.pop_front();
stats.records_dropped += 1;
}
buffer.push_back(record);
return Ok(true);
}
BackpressureStrategy::DropNewest => {
stats.records_dropped += 1;
return Ok(false);
}
BackpressureStrategy::AdaptiveSampling => {
let mut rate =
write_lock_safe!(self.current_sampling_rate, "sampling rate write")?;
*rate = (*rate * 0.9).max(self.config.min_sampling_rate);
stats.current_sampling_rate = *rate;
if should_sample(*rate) {
let mut buffer =
write_lock_safe!(self.buffer, "backpressure buffer write")?;
buffer.push_back(record);
return Ok(true);
} else {
stats.records_dropped += 1;
return Ok(false);
}
}
BackpressureStrategy::RateLimiting => {
if self.acquire_token()? {
let mut buffer =
write_lock_safe!(self.buffer, "backpressure buffer write")?;
buffer.push_back(record);
return Ok(true);
} else {
stats.records_dropped += 1;
return Ok(false);
}
}
}
}
if current_size < self.config.low_watermark {
self.backpressure_active.store(false, Ordering::SeqCst);
if self.config.strategy == BackpressureStrategy::AdaptiveSampling {
let mut rate = write_lock_safe!(self.current_sampling_rate, "sampling rate write")?;
*rate = (*rate * 1.1).min(1.0);
stats.current_sampling_rate = *rate;
}
}
let mut buffer = write_lock_safe!(self.buffer, "backpressure buffer write")?;
buffer.push_back(record);
Ok(true)
}
pub fn push(&self, record: StreamRecord) -> Result<()> {
if self.config.strategy == BackpressureStrategy::Block {
let start = Instant::now();
loop {
if self.try_push(record.clone())? {
return Ok(());
}
if start.elapsed() > self.config.block_timeout {
return Err(Error::IoError("Backpressure timeout".into()));
}
thread::sleep(Duration::from_millis(10));
}
} else {
self.try_push(record)?;
Ok(())
}
}
pub fn pop(&self) -> Result<Option<StreamRecord>> {
let mut buffer = write_lock_safe!(self.buffer, "backpressure buffer write")?;
let record = buffer.pop_front();
if record.is_some() {
let mut stats = write_lock_safe!(self.stats, "backpressure stats write")?;
stats.records_processed += 1;
stats.current_buffer_size = buffer.len();
}
Ok(record)
}
pub fn pop_batch(&self, max_batch_size: usize) -> Result<Vec<StreamRecord>> {
let mut buffer = write_lock_safe!(self.buffer, "backpressure buffer write")?;
let batch_size = max_batch_size.min(buffer.len());
let mut batch = Vec::with_capacity(batch_size);
for _ in 0..batch_size {
if let Some(record) = buffer.pop_front() {
batch.push(record);
}
}
if !batch.is_empty() {
let mut stats = write_lock_safe!(self.stats, "backpressure stats write")?;
stats.records_processed += batch.len() as u64;
stats.current_buffer_size = buffer.len();
}
Ok(batch)
}
pub fn is_empty(&self) -> Result<bool> {
Ok(read_lock_safe!(self.buffer, "backpressure buffer read")?.is_empty())
}
pub fn len(&self) -> Result<usize> {
Ok(read_lock_safe!(self.buffer, "backpressure buffer read")?.len())
}
pub fn is_backpressure_active(&self) -> bool {
self.backpressure_active.load(Ordering::SeqCst)
}
pub fn stats(&self) -> Result<BackpressureStats> {
Ok(read_lock_safe!(self.stats, "backpressure stats read")?.clone())
}
pub fn reset_stats(&self) -> Result<()> {
let mut stats = write_lock_safe!(self.stats, "backpressure stats write")?;
*stats = BackpressureStats::default();
Ok(())
}
fn acquire_token(&self) -> Result<bool> {
let mut state = lock_safe!(self.rate_limiter, "rate limiter lock")?;
let now = Instant::now();
let elapsed = now.duration_since(state.last_refill);
let new_tokens = elapsed.as_secs_f64() * state.rate;
state.tokens = (state.tokens + new_tokens).min(state.rate);
state.last_refill = now;
if state.tokens >= 1.0 {
state.tokens -= 1.0;
Ok(true)
} else {
Ok(false)
}
}
}
fn should_sample(rate: f64) -> bool {
use std::time::{SystemTime, UNIX_EPOCH};
let nanos = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("operation should succeed")
.subsec_nanos();
(nanos as f64 / u32::MAX as f64) < rate
}
pub struct BackpressureChannel {
sender: Sender<StreamRecord>,
receiver: Receiver<StreamRecord>,
config: BackpressureConfig,
stats: Arc<RwLock<BackpressureStats>>,
buffer_size: Arc<AtomicUsize>,
backpressure_active: Arc<AtomicBool>,
}
impl BackpressureChannel {
pub fn new(config: BackpressureConfig) -> Self {
let (sender, receiver) = bounded(config.high_watermark);
BackpressureChannel {
sender,
receiver,
config,
stats: Arc::new(RwLock::new(BackpressureStats::default())),
buffer_size: Arc::new(AtomicUsize::new(0)),
backpressure_active: Arc::new(AtomicBool::new(false)),
}
}
pub fn send(&self, record: StreamRecord) -> Result<()> {
let mut stats = write_lock_safe!(self.stats, "backpressure channel stats write")?;
stats.records_received += 1;
let current_size = self.buffer_size.load(Ordering::SeqCst);
match self.config.strategy {
BackpressureStrategy::Block => {
match self.sender.send_timeout(record, self.config.block_timeout) {
Ok(_) => {
self.buffer_size.fetch_add(1, Ordering::SeqCst);
Ok(())
}
Err(_) => {
stats.records_dropped += 1;
stats.backpressure_events += 1;
Err(Error::IoError("Channel send timeout".into()))
}
}
}
BackpressureStrategy::DropNewest => {
if current_size >= self.config.high_watermark {
stats.records_dropped += 1;
stats.backpressure_events += 1;
self.backpressure_active.store(true, Ordering::SeqCst);
Ok(())
} else {
match self.sender.try_send(record) {
Ok(_) => {
self.buffer_size.fetch_add(1, Ordering::SeqCst);
Ok(())
}
Err(TrySendError::Full(_)) => {
stats.records_dropped += 1;
stats.backpressure_events += 1;
Ok(())
}
Err(TrySendError::Disconnected(_)) => {
Err(Error::IoError("Channel disconnected".into()))
}
}
}
}
_ => {
match self.sender.try_send(record) {
Ok(_) => {
self.buffer_size.fetch_add(1, Ordering::SeqCst);
Ok(())
}
Err(TrySendError::Full(_)) => {
stats.records_dropped += 1;
stats.backpressure_events += 1;
Ok(())
}
Err(TrySendError::Disconnected(_)) => {
Err(Error::IoError("Channel disconnected".into()))
}
}
}
}
}
pub fn recv(&self) -> Result<StreamRecord> {
match self.receiver.recv() {
Ok(record) => {
self.buffer_size.fetch_sub(1, Ordering::SeqCst);
let current_size = self.buffer_size.load(Ordering::SeqCst);
if current_size < self.config.low_watermark {
self.backpressure_active.store(false, Ordering::SeqCst);
}
let mut stats = write_lock_safe!(self.stats, "backpressure channel stats write")?;
stats.records_processed += 1;
stats.current_buffer_size = current_size;
Ok(record)
}
Err(_) => Err(Error::IoError("Channel receive failed".into())),
}
}
pub fn recv_timeout(&self, timeout: Duration) -> Result<Option<StreamRecord>> {
match self.receiver.recv_timeout(timeout) {
Ok(record) => {
self.buffer_size.fetch_sub(1, Ordering::SeqCst);
let mut stats = write_lock_safe!(self.stats, "backpressure channel stats write")?;
stats.records_processed += 1;
stats.current_buffer_size = self.buffer_size.load(Ordering::SeqCst);
Ok(Some(record))
}
Err(crossbeam_channel::RecvTimeoutError::Timeout) => Ok(None),
Err(_) => Err(Error::IoError("Channel disconnected".into())),
}
}
pub fn len(&self) -> usize {
self.buffer_size.load(Ordering::SeqCst)
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
pub fn stats(&self) -> Result<BackpressureStats> {
Ok(read_lock_safe!(self.stats, "backpressure stats read")?.clone())
}
pub fn is_backpressure_active(&self) -> bool {
self.backpressure_active.load(Ordering::SeqCst)
}
}
#[derive(Debug)]
pub struct FlowController {
target_throughput: f64,
current_throughput: Arc<RwLock<f64>>,
record_count: Arc<AtomicU64>,
window_start: Arc<RwLock<Instant>>,
window_duration: Duration,
active: Arc<AtomicBool>,
}
impl FlowController {
pub fn new(target_throughput: f64, window_duration: Duration) -> Self {
FlowController {
target_throughput,
current_throughput: Arc::new(RwLock::new(0.0)),
record_count: Arc::new(AtomicU64::new(0)),
window_start: Arc::new(RwLock::new(Instant::now())),
window_duration,
active: Arc::new(AtomicBool::new(true)),
}
}
pub fn record_processed(&self) -> Result<bool> {
if !self.active.load(Ordering::SeqCst) {
return Ok(true);
}
let count = self.record_count.fetch_add(1, Ordering::SeqCst) + 1;
let window_start =
*read_lock_safe!(self.window_start, "flow controller window start read")?;
let elapsed = window_start.elapsed();
if elapsed >= self.window_duration {
let throughput = count as f64 / elapsed.as_secs_f64();
*write_lock_safe!(self.current_throughput, "flow controller throughput write")? =
throughput;
self.record_count.store(0, Ordering::SeqCst);
*write_lock_safe!(self.window_start, "flow controller window start write")? =
Instant::now();
if throughput > self.target_throughput * 1.1 {
let delay_ms = ((throughput / self.target_throughput - 1.0) * 100.0) as u64;
thread::sleep(Duration::from_millis(delay_ms.min(100)));
}
}
Ok(true)
}
pub fn current_throughput(&self) -> Result<f64> {
Ok(*read_lock_safe!(
self.current_throughput,
"flow controller throughput read"
)?)
}
pub fn set_target_throughput(&self, target: f64) {
}
pub fn pause(&self) {
self.active.store(false, Ordering::SeqCst);
}
pub fn resume(&self) {
self.active.store(true, Ordering::SeqCst);
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::collections::HashMap;
fn create_test_record() -> StreamRecord {
let mut fields = HashMap::new();
fields.insert("value".to_string(), "42".to_string());
StreamRecord::new(fields)
}
#[test]
fn test_backpressure_buffer_normal_operation() {
let config = BackpressureConfig::default();
let buffer = BackpressureBuffer::new(config);
for _ in 0..100 {
buffer
.push(create_test_record())
.expect("operation should succeed");
}
assert_eq!(buffer.len().expect("operation should succeed"), 100);
assert!(!buffer.is_backpressure_active());
}
#[test]
fn test_backpressure_buffer_drop_oldest() {
let config = BackpressureConfigBuilder::new()
.high_watermark(10)
.low_watermark(5)
.strategy(BackpressureStrategy::DropOldest)
.build();
let buffer = BackpressureBuffer::new(config);
for _ in 0..20 {
buffer
.try_push(create_test_record())
.expect("operation should succeed");
}
assert!(buffer.len().expect("operation should succeed") <= 10);
}
#[test]
fn test_backpressure_buffer_drop_newest() {
let config = BackpressureConfigBuilder::new()
.high_watermark(10)
.low_watermark(5)
.strategy(BackpressureStrategy::DropNewest)
.build();
let buffer = BackpressureBuffer::new(config);
for _ in 0..20 {
buffer
.try_push(create_test_record())
.expect("operation should succeed");
}
assert_eq!(buffer.len().expect("operation should succeed"), 10);
}
#[test]
fn test_backpressure_stats() {
let config = BackpressureConfigBuilder::new()
.high_watermark(10)
.low_watermark(5)
.strategy(BackpressureStrategy::DropNewest)
.build();
let buffer = BackpressureBuffer::new(config);
for _ in 0..20 {
buffer
.try_push(create_test_record())
.expect("operation should succeed");
}
let stats = buffer.stats().expect("operation should succeed");
assert_eq!(stats.records_received, 20);
assert_eq!(stats.records_dropped, 10);
assert!(stats.backpressure_events > 0);
}
#[test]
fn test_backpressure_channel() {
let config = BackpressureConfigBuilder::new()
.high_watermark(100)
.low_watermark(50)
.build();
let channel = BackpressureChannel::new(config);
for _ in 0..50 {
channel
.send(create_test_record())
.expect("operation should succeed");
}
assert_eq!(channel.len(), 50);
for _ in 0..25 {
channel.recv().expect("operation should succeed");
}
assert_eq!(channel.len(), 25);
}
#[test]
fn test_flow_controller() {
let controller = FlowController::new(1000.0, Duration::from_millis(100));
for _ in 0..100 {
let _ = controller.record_processed();
}
controller.pause();
assert!(controller
.record_processed()
.expect("operation should succeed"));
}
#[test]
fn test_backpressure_pop_batch() {
let config = BackpressureConfig::default();
let buffer = BackpressureBuffer::new(config);
for _ in 0..100 {
buffer
.push(create_test_record())
.expect("operation should succeed");
}
let batch = buffer.pop_batch(30).expect("operation should succeed");
assert_eq!(batch.len(), 30);
assert_eq!(buffer.len().expect("operation should succeed"), 70);
}
}