use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Duration, Instant};
use thiserror::Error;
use tokio::sync::{RwLock, Semaphore, mpsc, oneshot};
const MAX_PENDING_REQUESTS: usize = 10_000;
const DEFAULT_BATCH_TIMEOUT_MS: u64 = 100;
#[derive(Debug, Error)]
pub enum PipelineError {
#[error("Pipeline is full, cannot accept more requests")]
PipelineFull,
#[error("Request timeout after {0}ms")]
RequestTimeout(u64),
#[error("Batch execution failed: {0}")]
BatchFailed(String),
#[error("Request cancelled")]
Cancelled,
#[error("Invalid request: {0}")]
InvalidRequest(String),
#[error("Pipeline is shutting down")]
ShuttingDown,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
pub enum RequestPriority {
Low = 0,
Normal = 1,
High = 2,
Critical = 3,
}
impl Default for RequestPriority {
#[inline]
fn default() -> Self {
Self::Normal
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PipelineRequest {
pub operation: String,
pub payload: Vec<u8>,
pub priority: RequestPriority,
pub request_id: String,
pub created_at_ms: u64,
}
impl PipelineRequest {
#[must_use]
pub fn new(operation: impl Into<String>, payload: Vec<u8>) -> Self {
Self {
operation: operation.into(),
payload,
priority: RequestPriority::Normal,
request_id: generate_request_id(),
created_at_ms: current_timestamp_ms(),
}
}
#[must_use]
pub fn with_priority(mut self, priority: RequestPriority) -> Self {
self.priority = priority;
self
}
#[must_use]
pub fn with_request_id(mut self, id: String) -> Self {
self.request_id = id;
self
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PipelineResponse {
pub request_id: String,
pub success: bool,
pub payload: Vec<u8>,
pub error: Option<String>,
pub processing_time_ms: u64,
}
impl PipelineResponse {
#[must_use]
#[inline]
pub const fn is_success(&self) -> bool {
self.success
}
#[must_use]
#[inline]
pub const fn is_failure(&self) -> bool {
!self.success
}
#[must_use]
#[inline]
pub fn error_message(&self) -> Option<&str> {
self.error.as_deref()
}
}
#[derive(Debug, Clone)]
pub struct PipelineConfig {
pub max_batch_size: usize,
pub max_concurrent: usize,
pub batch_timeout_ms: u64,
pub max_queue_time_ms: u64,
pub enable_deduplication: bool,
}
impl Default for PipelineConfig {
#[inline]
fn default() -> Self {
Self {
max_batch_size: 50,
max_concurrent: 10,
batch_timeout_ms: DEFAULT_BATCH_TIMEOUT_MS,
max_queue_time_ms: 5_000,
enable_deduplication: true,
}
}
}
#[derive(Debug, Clone, Default)]
pub struct PipelineStats {
pub total_requests: u64,
pub successful_requests: u64,
pub failed_requests: u64,
pub deduplicated_requests: u64,
pub total_batches: u64,
pub avg_batch_size: f64,
pub avg_latency_ms: f64,
pub queue_depth: usize,
}
impl PipelineStats {
#[must_use]
#[inline]
pub fn success_rate(&self) -> f64 {
let total_processed = self.successful_requests + self.failed_requests;
if total_processed == 0 {
0.0
} else {
(self.successful_requests as f64 / total_processed as f64) * 100.0
}
}
#[must_use]
#[inline]
pub fn failure_rate(&self) -> f64 {
100.0 - self.success_rate()
}
#[must_use]
#[inline]
pub fn dedup_rate(&self) -> f64 {
if self.total_requests == 0 {
0.0
} else {
(self.deduplicated_requests as f64 / self.total_requests as f64) * 100.0
}
}
}
struct PendingRequest {
request: PipelineRequest,
response_tx: oneshot::Sender<Result<PipelineResponse, PipelineError>>,
queued_at: Instant,
}
pub struct RequestPipeline {
config: PipelineConfig,
request_tx: mpsc::Sender<PendingRequest>,
stats: Arc<RwLock<PipelineStats>>,
_worker_handle: tokio::task::JoinHandle<()>,
}
impl RequestPipeline {
pub fn new(config: PipelineConfig) -> Self {
let (request_tx, request_rx) = mpsc::channel(MAX_PENDING_REQUESTS);
let stats = Arc::new(RwLock::new(PipelineStats::default()));
let worker_handle = tokio::spawn(Self::pipeline_worker(
config.clone(),
request_rx,
Arc::clone(&stats),
));
Self {
config,
request_tx,
stats,
_worker_handle: worker_handle,
}
}
pub async fn submit(
&self,
request: PipelineRequest,
) -> Result<PipelineResponse, PipelineError> {
let (response_tx, response_rx) = oneshot::channel();
let pending = PendingRequest {
request,
response_tx,
queued_at: Instant::now(),
};
self.request_tx
.send(pending)
.await
.map_err(|_| PipelineError::ShuttingDown)?;
{
let mut stats = self.stats.write().await;
stats.total_requests += 1;
stats.queue_depth = self.request_tx.max_capacity() - self.request_tx.capacity();
}
response_rx.await.map_err(|_| PipelineError::Cancelled)?
}
pub async fn submit_batch(
&self,
requests: Vec<PipelineRequest>,
) -> Vec<Result<PipelineResponse, PipelineError>> {
let mut results = Vec::with_capacity(requests.len());
for request in requests {
let result = self.submit(request).await;
results.push(result);
}
results
}
pub async fn stats(&self) -> PipelineStats {
self.stats.read().await.clone()
}
#[must_use]
pub fn config(&self) -> &PipelineConfig {
&self.config
}
async fn pipeline_worker(
config: PipelineConfig,
mut request_rx: mpsc::Receiver<PendingRequest>,
stats: Arc<RwLock<PipelineStats>>,
) {
let mut batch: Vec<PendingRequest> = Vec::with_capacity(config.max_batch_size);
let mut last_batch_time = Instant::now();
let batch_timeout = Duration::from_millis(config.batch_timeout_ms);
let semaphore = Arc::new(Semaphore::new(config.max_concurrent));
let mut dedup_map: HashMap<
String,
Vec<oneshot::Sender<Result<PipelineResponse, PipelineError>>>,
> = HashMap::new();
loop {
let timeout = tokio::time::sleep(batch_timeout);
tokio::pin!(timeout);
tokio::select! {
Some(pending) = request_rx.recv() => {
if pending.queued_at.elapsed() > Duration::from_millis(config.max_queue_time_ms) {
let _ = pending.response_tx.send(Err(PipelineError::RequestTimeout(config.max_queue_time_ms)));
continue;
}
if config.enable_deduplication {
let dedup_key = format!("{}:{}", pending.request.operation, hex::encode(&pending.request.payload));
if let Some(channels) = dedup_map.get_mut(&dedup_key) {
channels.push(pending.response_tx);
let mut stats = stats.write().await;
stats.deduplicated_requests += 1;
continue;
} else {
dedup_map.insert(dedup_key, vec![]);
}
}
batch.push(pending);
if batch.len() >= config.max_batch_size {
let current_batch = std::mem::replace(&mut batch, Vec::with_capacity(config.max_batch_size));
Self::process_batch(
current_batch,
Arc::clone(&semaphore),
Arc::clone(&stats),
).await;
last_batch_time = Instant::now();
dedup_map.clear();
}
}
() = &mut timeout, if !batch.is_empty() => {
if last_batch_time.elapsed() >= batch_timeout && !batch.is_empty() {
let current_batch = std::mem::replace(&mut batch, Vec::with_capacity(config.max_batch_size));
Self::process_batch(
current_batch,
Arc::clone(&semaphore),
Arc::clone(&stats),
).await;
last_batch_time = Instant::now();
dedup_map.clear();
}
}
else => break,
}
}
}
async fn process_batch(
batch: Vec<PendingRequest>,
semaphore: Arc<Semaphore>,
stats: Arc<RwLock<PipelineStats>>,
) {
let batch_size = batch.len();
let batch_start = Instant::now();
let _permit = semaphore.acquire().await.expect("Semaphore closed");
for pending in batch {
let start_time = Instant::now();
let response = Self::execute_request(&pending.request).await;
let processing_time_ms = start_time.elapsed().as_millis() as u64;
let result = response.map(|mut resp| {
resp.processing_time_ms = processing_time_ms;
resp
});
{
let mut stats = stats.write().await;
match &result {
Ok(_) => stats.successful_requests += 1,
Err(_) => stats.failed_requests += 1,
}
let total_latency = stats.avg_latency_ms
* (stats.successful_requests + stats.failed_requests - 1) as f64;
stats.avg_latency_ms = (total_latency + processing_time_ms as f64)
/ (stats.successful_requests + stats.failed_requests) as f64;
}
let _ = pending.response_tx.send(result);
}
{
let mut stats = stats.write().await;
stats.total_batches += 1;
let total_batch_size = stats.avg_batch_size * (stats.total_batches - 1) as f64;
stats.avg_batch_size =
(total_batch_size + batch_size as f64) / stats.total_batches as f64;
}
let _batch_duration = batch_start.elapsed();
}
async fn execute_request(request: &PipelineRequest) -> Result<PipelineResponse, PipelineError> {
tokio::time::sleep(Duration::from_millis(10)).await;
Ok(PipelineResponse {
request_id: request.request_id.clone(),
success: true,
payload: vec![],
error: None,
processing_time_ms: 0, })
}
}
fn generate_request_id() -> String {
use std::sync::atomic::{AtomicU64, Ordering};
static COUNTER: AtomicU64 = AtomicU64::new(0);
let id = COUNTER.fetch_add(1, Ordering::SeqCst);
format!("req-{}-{}", current_timestamp_ms(), id)
}
fn current_timestamp_ms() -> u64 {
use std::time::{SystemTime, UNIX_EPOCH};
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as u64
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_pipeline_config_default() {
let config = PipelineConfig::default();
assert_eq!(config.max_batch_size, 50);
assert_eq!(config.max_concurrent, 10);
assert_eq!(config.batch_timeout_ms, DEFAULT_BATCH_TIMEOUT_MS);
assert!(config.enable_deduplication);
}
#[test]
fn test_request_priority_order() {
assert!(RequestPriority::Critical > RequestPriority::High);
assert!(RequestPriority::High > RequestPriority::Normal);
assert!(RequestPriority::Normal > RequestPriority::Low);
}
#[test]
fn test_pipeline_request_creation() {
let request = PipelineRequest::new("test_op", vec![1, 2, 3]);
assert_eq!(request.operation, "test_op");
assert_eq!(request.payload, vec![1, 2, 3]);
assert_eq!(request.priority, RequestPriority::Normal);
}
#[test]
fn test_pipeline_request_with_priority() {
let request = PipelineRequest::new("test_op", vec![]).with_priority(RequestPriority::High);
assert_eq!(request.priority, RequestPriority::High);
}
#[test]
fn test_pipeline_request_with_id() {
let request =
PipelineRequest::new("test_op", vec![]).with_request_id("custom-id".to_string());
assert_eq!(request.request_id, "custom-id");
}
#[tokio::test]
async fn test_pipeline_creation() {
let config = PipelineConfig::default();
let _pipeline = RequestPipeline::new(config);
}
#[tokio::test]
async fn test_pipeline_submit_single_request() {
let config = PipelineConfig::default();
let pipeline = RequestPipeline::new(config);
let request = PipelineRequest::new("test", vec![1, 2, 3]);
let response = pipeline.submit(request).await;
assert!(response.is_ok());
let response = response.unwrap();
assert!(response.success);
}
#[tokio::test]
async fn test_pipeline_submit_batch() {
let config = PipelineConfig::default();
let pipeline = RequestPipeline::new(config);
let requests = vec![
PipelineRequest::new("test1", vec![1]),
PipelineRequest::new("test2", vec![2]),
PipelineRequest::new("test3", vec![3]),
];
let responses = pipeline.submit_batch(requests).await;
assert_eq!(responses.len(), 3);
for response in responses {
assert!(response.is_ok());
}
}
#[tokio::test]
async fn test_pipeline_stats() {
let config = PipelineConfig::default();
let pipeline = RequestPipeline::new(config);
let request = PipelineRequest::new("test", vec![1, 2, 3]);
let _ = pipeline.submit(request).await;
tokio::time::sleep(Duration::from_millis(100)).await;
let stats = pipeline.stats().await;
assert_eq!(stats.total_requests, 1);
}
#[test]
fn test_generate_request_id_uniqueness() {
let id1 = generate_request_id();
let id2 = generate_request_id();
assert_ne!(id1, id2);
}
#[test]
fn test_request_priority_default() {
let priority = RequestPriority::default();
assert_eq!(priority, RequestPriority::Normal);
}
}