1use serde::{Deserialize, Serialize};
42use std::collections::HashMap;
43use std::sync::Arc;
44use std::time::{Duration, Instant};
45use thiserror::Error;
46use tokio::sync::{RwLock, Semaphore, mpsc, oneshot};
47
48const MAX_PENDING_REQUESTS: usize = 10_000;
50
51const DEFAULT_BATCH_TIMEOUT_MS: u64 = 100;
53
54#[derive(Debug, Error)]
56pub enum PipelineError {
57 #[error("Pipeline is full, cannot accept more requests")]
58 PipelineFull,
59
60 #[error("Request timeout after {0}ms")]
61 RequestTimeout(u64),
62
63 #[error("Batch execution failed: {0}")]
64 BatchFailed(String),
65
66 #[error("Request cancelled")]
67 Cancelled,
68
69 #[error("Invalid request: {0}")]
70 InvalidRequest(String),
71
72 #[error("Pipeline is shutting down")]
73 ShuttingDown,
74}
75
76#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
78pub enum RequestPriority {
79 Low = 0,
81 Normal = 1,
83 High = 2,
85 Critical = 3,
87}
88
89impl Default for RequestPriority {
90 #[inline]
91 fn default() -> Self {
92 Self::Normal
93 }
94}
95
96#[derive(Debug, Clone, Serialize, Deserialize)]
98pub struct PipelineRequest {
99 pub operation: String,
101 pub payload: Vec<u8>,
103 pub priority: RequestPriority,
105 pub request_id: String,
107 pub created_at_ms: u64,
109}
110
111impl PipelineRequest {
112 #[must_use]
114 pub fn new(operation: impl Into<String>, payload: Vec<u8>) -> Self {
115 Self {
116 operation: operation.into(),
117 payload,
118 priority: RequestPriority::Normal,
119 request_id: generate_request_id(),
120 created_at_ms: current_timestamp_ms(),
121 }
122 }
123
124 #[must_use]
126 pub fn with_priority(mut self, priority: RequestPriority) -> Self {
127 self.priority = priority;
128 self
129 }
130
131 #[must_use]
133 pub fn with_request_id(mut self, id: String) -> Self {
134 self.request_id = id;
135 self
136 }
137}
138
139#[derive(Debug, Clone, Serialize, Deserialize)]
141pub struct PipelineResponse {
142 pub request_id: String,
144 pub success: bool,
146 pub payload: Vec<u8>,
148 pub error: Option<String>,
150 pub processing_time_ms: u64,
152}
153
154impl PipelineResponse {
155 #[must_use]
157 #[inline]
158 pub const fn is_success(&self) -> bool {
159 self.success
160 }
161
162 #[must_use]
164 #[inline]
165 pub const fn is_failure(&self) -> bool {
166 !self.success
167 }
168
169 #[must_use]
171 #[inline]
172 pub fn error_message(&self) -> Option<&str> {
173 self.error.as_deref()
174 }
175}
176
177#[derive(Debug, Clone)]
179pub struct PipelineConfig {
180 pub max_batch_size: usize,
182 pub max_concurrent: usize,
184 pub batch_timeout_ms: u64,
186 pub max_queue_time_ms: u64,
188 pub enable_deduplication: bool,
190}
191
192impl Default for PipelineConfig {
193 #[inline]
194 fn default() -> Self {
195 Self {
196 max_batch_size: 50,
197 max_concurrent: 10,
198 batch_timeout_ms: DEFAULT_BATCH_TIMEOUT_MS,
199 max_queue_time_ms: 5_000,
200 enable_deduplication: true,
201 }
202 }
203}
204
205#[derive(Debug, Clone, Default)]
207pub struct PipelineStats {
208 pub total_requests: u64,
210 pub successful_requests: u64,
212 pub failed_requests: u64,
214 pub deduplicated_requests: u64,
216 pub total_batches: u64,
218 pub avg_batch_size: f64,
220 pub avg_latency_ms: f64,
222 pub queue_depth: usize,
224}
225
226impl PipelineStats {
227 #[must_use]
229 #[inline]
230 pub fn success_rate(&self) -> f64 {
231 let total_processed = self.successful_requests + self.failed_requests;
232 if total_processed == 0 {
233 0.0
234 } else {
235 (self.successful_requests as f64 / total_processed as f64) * 100.0
236 }
237 }
238
239 #[must_use]
241 #[inline]
242 pub fn failure_rate(&self) -> f64 {
243 100.0 - self.success_rate()
244 }
245
246 #[must_use]
248 #[inline]
249 pub fn dedup_rate(&self) -> f64 {
250 if self.total_requests == 0 {
251 0.0
252 } else {
253 (self.deduplicated_requests as f64 / self.total_requests as f64) * 100.0
254 }
255 }
256}
257
258struct PendingRequest {
260 request: PipelineRequest,
261 response_tx: oneshot::Sender<Result<PipelineResponse, PipelineError>>,
262 queued_at: Instant,
263}
264
265pub struct RequestPipeline {
267 config: PipelineConfig,
268 request_tx: mpsc::Sender<PendingRequest>,
269 stats: Arc<RwLock<PipelineStats>>,
270 _worker_handle: tokio::task::JoinHandle<()>,
271}
272
273impl RequestPipeline {
274 pub fn new(config: PipelineConfig) -> Self {
276 let (request_tx, request_rx) = mpsc::channel(MAX_PENDING_REQUESTS);
277 let stats = Arc::new(RwLock::new(PipelineStats::default()));
278
279 let worker_handle = tokio::spawn(Self::pipeline_worker(
280 config.clone(),
281 request_rx,
282 Arc::clone(&stats),
283 ));
284
285 Self {
286 config,
287 request_tx,
288 stats,
289 _worker_handle: worker_handle,
290 }
291 }
292
293 pub async fn submit(
295 &self,
296 request: PipelineRequest,
297 ) -> Result<PipelineResponse, PipelineError> {
298 let (response_tx, response_rx) = oneshot::channel();
299
300 let pending = PendingRequest {
301 request,
302 response_tx,
303 queued_at: Instant::now(),
304 };
305
306 self.request_tx
307 .send(pending)
308 .await
309 .map_err(|_| PipelineError::ShuttingDown)?;
310
311 {
313 let mut stats = self.stats.write().await;
314 stats.total_requests += 1;
315 stats.queue_depth = self.request_tx.max_capacity() - self.request_tx.capacity();
316 }
317
318 response_rx.await.map_err(|_| PipelineError::Cancelled)?
319 }
320
321 pub async fn submit_batch(
323 &self,
324 requests: Vec<PipelineRequest>,
325 ) -> Vec<Result<PipelineResponse, PipelineError>> {
326 let mut results = Vec::with_capacity(requests.len());
327
328 for request in requests {
329 let result = self.submit(request).await;
330 results.push(result);
331 }
332
333 results
334 }
335
336 pub async fn stats(&self) -> PipelineStats {
338 self.stats.read().await.clone()
339 }
340
341 #[must_use]
343 pub fn config(&self) -> &PipelineConfig {
344 &self.config
345 }
346
347 async fn pipeline_worker(
349 config: PipelineConfig,
350 mut request_rx: mpsc::Receiver<PendingRequest>,
351 stats: Arc<RwLock<PipelineStats>>,
352 ) {
353 let mut batch: Vec<PendingRequest> = Vec::with_capacity(config.max_batch_size);
354 let mut last_batch_time = Instant::now();
355 let batch_timeout = Duration::from_millis(config.batch_timeout_ms);
356 let semaphore = Arc::new(Semaphore::new(config.max_concurrent));
357
358 let mut dedup_map: HashMap<
360 String,
361 Vec<oneshot::Sender<Result<PipelineResponse, PipelineError>>>,
362 > = HashMap::new();
363
364 loop {
365 let timeout = tokio::time::sleep(batch_timeout);
367 tokio::pin!(timeout);
368
369 tokio::select! {
370 Some(pending) = request_rx.recv() => {
371 if pending.queued_at.elapsed() > Duration::from_millis(config.max_queue_time_ms) {
373 let _ = pending.response_tx.send(Err(PipelineError::RequestTimeout(config.max_queue_time_ms)));
374 continue;
375 }
376
377 if config.enable_deduplication {
379 let dedup_key = format!("{}:{}", pending.request.operation, hex::encode(&pending.request.payload));
380 if let Some(channels) = dedup_map.get_mut(&dedup_key) {
381 channels.push(pending.response_tx);
382 let mut stats = stats.write().await;
383 stats.deduplicated_requests += 1;
384 continue;
385 } else {
386 dedup_map.insert(dedup_key, vec![]);
387 }
388 }
389
390 batch.push(pending);
391
392 if batch.len() >= config.max_batch_size {
394 let current_batch = std::mem::replace(&mut batch, Vec::with_capacity(config.max_batch_size));
395 Self::process_batch(
396 current_batch,
397 Arc::clone(&semaphore),
398 Arc::clone(&stats),
399 ).await;
400 last_batch_time = Instant::now();
401 dedup_map.clear();
402 }
403 }
404 () = &mut timeout, if !batch.is_empty() => {
405 if last_batch_time.elapsed() >= batch_timeout && !batch.is_empty() {
407 let current_batch = std::mem::replace(&mut batch, Vec::with_capacity(config.max_batch_size));
408 Self::process_batch(
409 current_batch,
410 Arc::clone(&semaphore),
411 Arc::clone(&stats),
412 ).await;
413 last_batch_time = Instant::now();
414 dedup_map.clear();
415 }
416 }
417 else => break,
418 }
419 }
420 }
421
422 async fn process_batch(
424 batch: Vec<PendingRequest>,
425 semaphore: Arc<Semaphore>,
426 stats: Arc<RwLock<PipelineStats>>,
427 ) {
428 let batch_size = batch.len();
429 let batch_start = Instant::now();
430
431 let _permit = semaphore.acquire().await.expect("Semaphore closed");
433
434 for pending in batch {
436 let start_time = Instant::now();
437
438 let response = Self::execute_request(&pending.request).await;
440
441 let processing_time_ms = start_time.elapsed().as_millis() as u64;
442
443 let result = response.map(|mut resp| {
444 resp.processing_time_ms = processing_time_ms;
445 resp
446 });
447
448 {
450 let mut stats = stats.write().await;
451 match &result {
452 Ok(_) => stats.successful_requests += 1,
453 Err(_) => stats.failed_requests += 1,
454 }
455
456 let total_latency = stats.avg_latency_ms
458 * (stats.successful_requests + stats.failed_requests - 1) as f64;
459 stats.avg_latency_ms = (total_latency + processing_time_ms as f64)
460 / (stats.successful_requests + stats.failed_requests) as f64;
461 }
462
463 let _ = pending.response_tx.send(result);
465 }
466
467 {
469 let mut stats = stats.write().await;
470 stats.total_batches += 1;
471 let total_batch_size = stats.avg_batch_size * (stats.total_batches - 1) as f64;
472 stats.avg_batch_size =
473 (total_batch_size + batch_size as f64) / stats.total_batches as f64;
474 }
475
476 let _batch_duration = batch_start.elapsed();
477 }
478
479 async fn execute_request(request: &PipelineRequest) -> Result<PipelineResponse, PipelineError> {
481 tokio::time::sleep(Duration::from_millis(10)).await;
488
489 Ok(PipelineResponse {
490 request_id: request.request_id.clone(),
491 success: true,
492 payload: vec![],
493 error: None,
494 processing_time_ms: 0, })
496 }
497}
498
499fn generate_request_id() -> String {
501 use std::sync::atomic::{AtomicU64, Ordering};
502 static COUNTER: AtomicU64 = AtomicU64::new(0);
503 let id = COUNTER.fetch_add(1, Ordering::SeqCst);
504 format!("req-{}-{}", current_timestamp_ms(), id)
505}
506
507fn current_timestamp_ms() -> u64 {
509 use std::time::{SystemTime, UNIX_EPOCH};
510 SystemTime::now()
511 .duration_since(UNIX_EPOCH)
512 .unwrap_or_default()
513 .as_millis() as u64
514}
515
516#[cfg(test)]
517mod tests {
518 use super::*;
519
520 #[test]
521 fn test_pipeline_config_default() {
522 let config = PipelineConfig::default();
523 assert_eq!(config.max_batch_size, 50);
524 assert_eq!(config.max_concurrent, 10);
525 assert_eq!(config.batch_timeout_ms, DEFAULT_BATCH_TIMEOUT_MS);
526 assert!(config.enable_deduplication);
527 }
528
529 #[test]
530 fn test_request_priority_order() {
531 assert!(RequestPriority::Critical > RequestPriority::High);
532 assert!(RequestPriority::High > RequestPriority::Normal);
533 assert!(RequestPriority::Normal > RequestPriority::Low);
534 }
535
536 #[test]
537 fn test_pipeline_request_creation() {
538 let request = PipelineRequest::new("test_op", vec![1, 2, 3]);
539 assert_eq!(request.operation, "test_op");
540 assert_eq!(request.payload, vec![1, 2, 3]);
541 assert_eq!(request.priority, RequestPriority::Normal);
542 }
543
544 #[test]
545 fn test_pipeline_request_with_priority() {
546 let request = PipelineRequest::new("test_op", vec![]).with_priority(RequestPriority::High);
547 assert_eq!(request.priority, RequestPriority::High);
548 }
549
550 #[test]
551 fn test_pipeline_request_with_id() {
552 let request =
553 PipelineRequest::new("test_op", vec![]).with_request_id("custom-id".to_string());
554 assert_eq!(request.request_id, "custom-id");
555 }
556
557 #[tokio::test]
558 async fn test_pipeline_creation() {
559 let config = PipelineConfig::default();
560 let _pipeline = RequestPipeline::new(config);
561 }
563
564 #[tokio::test]
565 async fn test_pipeline_submit_single_request() {
566 let config = PipelineConfig::default();
567 let pipeline = RequestPipeline::new(config);
568
569 let request = PipelineRequest::new("test", vec![1, 2, 3]);
570 let response = pipeline.submit(request).await;
571
572 assert!(response.is_ok());
573 let response = response.unwrap();
574 assert!(response.success);
575 }
576
577 #[tokio::test]
578 async fn test_pipeline_submit_batch() {
579 let config = PipelineConfig::default();
580 let pipeline = RequestPipeline::new(config);
581
582 let requests = vec![
583 PipelineRequest::new("test1", vec![1]),
584 PipelineRequest::new("test2", vec![2]),
585 PipelineRequest::new("test3", vec![3]),
586 ];
587
588 let responses = pipeline.submit_batch(requests).await;
589
590 assert_eq!(responses.len(), 3);
591 for response in responses {
592 assert!(response.is_ok());
593 }
594 }
595
596 #[tokio::test]
597 async fn test_pipeline_stats() {
598 let config = PipelineConfig::default();
599 let pipeline = RequestPipeline::new(config);
600
601 let request = PipelineRequest::new("test", vec![1, 2, 3]);
602 let _ = pipeline.submit(request).await;
603
604 tokio::time::sleep(Duration::from_millis(100)).await;
606
607 let stats = pipeline.stats().await;
608 assert_eq!(stats.total_requests, 1);
609 }
610
611 #[test]
612 fn test_generate_request_id_uniqueness() {
613 let id1 = generate_request_id();
614 let id2 = generate_request_id();
615 assert_ne!(id1, id2);
616 }
617
618 #[test]
619 fn test_request_priority_default() {
620 let priority = RequestPriority::default();
621 assert_eq!(priority, RequestPriority::Normal);
622 }
623}