chie_core/
request_pipeline.rs

1//! Request pipelining for efficient batch operations.
2//!
3//! This module provides request pipelining to batch multiple API requests
4//! efficiently, reducing latency and improving throughput when making
5//! many requests to the coordinator or other services.
6//!
7//! # Features
8//!
9//! - **Batching**: Group multiple requests into batches
10//! - **Concurrency Control**: Limit concurrent requests
11//! - **Priority Queues**: Prioritize critical requests
12//! - **Automatic Retry**: Retry failed requests in batch
13//! - **Request Coalescing**: Merge duplicate requests
14//! - **Statistics Tracking**: Monitor pipeline performance
15//!
16//! # Example
17//!
18//! ```
19//! use chie_core::request_pipeline::{RequestPipeline, PipelineConfig, PipelineRequest};
20//! use std::sync::Arc;
21//!
22//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
23//! let config = PipelineConfig {
24//!     max_batch_size: 50,
25//!     max_concurrent: 10,
26//!     batch_timeout_ms: 100,
27//!     ..Default::default()
28//! };
29//!
30//! let pipeline = Arc::new(RequestPipeline::new(config));
31//!
32//! // Submit requests to the pipeline
33//! let request = PipelineRequest::new("submit_proof", vec![1, 2, 3]);
34//! let response = pipeline.submit(request).await?;
35//!
36//! println!("Response: {:?}", response);
37//! # Ok(())
38//! # }
39//! ```
40
41use serde::{Deserialize, Serialize};
42use std::collections::HashMap;
43use std::sync::Arc;
44use std::time::{Duration, Instant};
45use thiserror::Error;
46use tokio::sync::{RwLock, Semaphore, mpsc, oneshot};
47
48/// Maximum number of pending requests in the pipeline.
49const MAX_PENDING_REQUESTS: usize = 10_000;
50
51/// Default batch timeout in milliseconds.
52const DEFAULT_BATCH_TIMEOUT_MS: u64 = 100;
53
54/// Errors that can occur during request pipelining.
55#[derive(Debug, Error)]
56pub enum PipelineError {
57    #[error("Pipeline is full, cannot accept more requests")]
58    PipelineFull,
59
60    #[error("Request timeout after {0}ms")]
61    RequestTimeout(u64),
62
63    #[error("Batch execution failed: {0}")]
64    BatchFailed(String),
65
66    #[error("Request cancelled")]
67    Cancelled,
68
69    #[error("Invalid request: {0}")]
70    InvalidRequest(String),
71
72    #[error("Pipeline is shutting down")]
73    ShuttingDown,
74}
75
76/// Priority level for requests.
77#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
78pub enum RequestPriority {
79    /// Low priority (best effort).
80    Low = 0,
81    /// Normal priority (default).
82    Normal = 1,
83    /// High priority (expedited processing).
84    High = 2,
85    /// Critical priority (immediate processing).
86    Critical = 3,
87}
88
89impl Default for RequestPriority {
90    #[inline]
91    fn default() -> Self {
92        Self::Normal
93    }
94}
95
96/// A single request in the pipeline.
97#[derive(Debug, Clone, Serialize, Deserialize)]
98pub struct PipelineRequest {
99    /// Request operation name.
100    pub operation: String,
101    /// Request payload (arbitrary bytes).
102    pub payload: Vec<u8>,
103    /// Request priority.
104    pub priority: RequestPriority,
105    /// Request ID for tracking.
106    pub request_id: String,
107    /// Timestamp when request was created.
108    pub created_at_ms: u64,
109}
110
111impl PipelineRequest {
112    /// Create a new pipeline request.
113    #[must_use]
114    pub fn new(operation: impl Into<String>, payload: Vec<u8>) -> Self {
115        Self {
116            operation: operation.into(),
117            payload,
118            priority: RequestPriority::Normal,
119            request_id: generate_request_id(),
120            created_at_ms: current_timestamp_ms(),
121        }
122    }
123
124    /// Set the priority of this request.
125    #[must_use]
126    pub fn with_priority(mut self, priority: RequestPriority) -> Self {
127        self.priority = priority;
128        self
129    }
130
131    /// Set the request ID.
132    #[must_use]
133    pub fn with_request_id(mut self, id: String) -> Self {
134        self.request_id = id;
135        self
136    }
137}
138
139/// Response from a pipelined request.
140#[derive(Debug, Clone, Serialize, Deserialize)]
141pub struct PipelineResponse {
142    /// Request ID that this response corresponds to.
143    pub request_id: String,
144    /// Whether the request succeeded.
145    pub success: bool,
146    /// Response payload.
147    pub payload: Vec<u8>,
148    /// Error message if request failed.
149    pub error: Option<String>,
150    /// Processing time in milliseconds.
151    pub processing_time_ms: u64,
152}
153
154impl PipelineResponse {
155    /// Check if the response indicates success.
156    #[must_use]
157    #[inline]
158    pub const fn is_success(&self) -> bool {
159        self.success
160    }
161
162    /// Check if the response indicates failure.
163    #[must_use]
164    #[inline]
165    pub const fn is_failure(&self) -> bool {
166        !self.success
167    }
168
169    /// Get the error message if the response failed.
170    #[must_use]
171    #[inline]
172    pub fn error_message(&self) -> Option<&str> {
173        self.error.as_deref()
174    }
175}
176
177/// Configuration for the request pipeline.
178#[derive(Debug, Clone)]
179pub struct PipelineConfig {
180    /// Maximum number of requests in a single batch.
181    pub max_batch_size: usize,
182    /// Maximum number of concurrent batch executions.
183    pub max_concurrent: usize,
184    /// Time to wait before processing a partial batch (ms).
185    pub batch_timeout_ms: u64,
186    /// Maximum time a request can wait in queue (ms).
187    pub max_queue_time_ms: u64,
188    /// Enable request deduplication.
189    pub enable_deduplication: bool,
190}
191
192impl Default for PipelineConfig {
193    #[inline]
194    fn default() -> Self {
195        Self {
196            max_batch_size: 50,
197            max_concurrent: 10,
198            batch_timeout_ms: DEFAULT_BATCH_TIMEOUT_MS,
199            max_queue_time_ms: 5_000,
200            enable_deduplication: true,
201        }
202    }
203}
204
205/// Statistics for the request pipeline.
206#[derive(Debug, Clone, Default)]
207pub struct PipelineStats {
208    /// Total requests submitted.
209    pub total_requests: u64,
210    /// Total requests successfully processed.
211    pub successful_requests: u64,
212    /// Total requests that failed.
213    pub failed_requests: u64,
214    /// Total requests deduplicated.
215    pub deduplicated_requests: u64,
216    /// Total batches processed.
217    pub total_batches: u64,
218    /// Average batch size.
219    pub avg_batch_size: f64,
220    /// Average request latency (ms).
221    pub avg_latency_ms: f64,
222    /// Current queue depth.
223    pub queue_depth: usize,
224}
225
226impl PipelineStats {
227    /// Calculate the success rate as a percentage.
228    #[must_use]
229    #[inline]
230    pub fn success_rate(&self) -> f64 {
231        let total_processed = self.successful_requests + self.failed_requests;
232        if total_processed == 0 {
233            0.0
234        } else {
235            (self.successful_requests as f64 / total_processed as f64) * 100.0
236        }
237    }
238
239    /// Calculate the failure rate as a percentage.
240    #[must_use]
241    #[inline]
242    pub fn failure_rate(&self) -> f64 {
243        100.0 - self.success_rate()
244    }
245
246    /// Calculate the deduplication rate as a percentage.
247    #[must_use]
248    #[inline]
249    pub fn dedup_rate(&self) -> f64 {
250        if self.total_requests == 0 {
251            0.0
252        } else {
253            (self.deduplicated_requests as f64 / self.total_requests as f64) * 100.0
254        }
255    }
256}
257
258/// Internal pending request with completion channel.
259struct PendingRequest {
260    request: PipelineRequest,
261    response_tx: oneshot::Sender<Result<PipelineResponse, PipelineError>>,
262    queued_at: Instant,
263}
264
265/// Request pipeline for batching and concurrent execution.
266pub struct RequestPipeline {
267    config: PipelineConfig,
268    request_tx: mpsc::Sender<PendingRequest>,
269    stats: Arc<RwLock<PipelineStats>>,
270    _worker_handle: tokio::task::JoinHandle<()>,
271}
272
273impl RequestPipeline {
274    /// Create a new request pipeline.
275    pub fn new(config: PipelineConfig) -> Self {
276        let (request_tx, request_rx) = mpsc::channel(MAX_PENDING_REQUESTS);
277        let stats = Arc::new(RwLock::new(PipelineStats::default()));
278
279        let worker_handle = tokio::spawn(Self::pipeline_worker(
280            config.clone(),
281            request_rx,
282            Arc::clone(&stats),
283        ));
284
285        Self {
286            config,
287            request_tx,
288            stats,
289            _worker_handle: worker_handle,
290        }
291    }
292
293    /// Submit a request to the pipeline.
294    pub async fn submit(
295        &self,
296        request: PipelineRequest,
297    ) -> Result<PipelineResponse, PipelineError> {
298        let (response_tx, response_rx) = oneshot::channel();
299
300        let pending = PendingRequest {
301            request,
302            response_tx,
303            queued_at: Instant::now(),
304        };
305
306        self.request_tx
307            .send(pending)
308            .await
309            .map_err(|_| PipelineError::ShuttingDown)?;
310
311        // Update stats
312        {
313            let mut stats = self.stats.write().await;
314            stats.total_requests += 1;
315            stats.queue_depth = self.request_tx.max_capacity() - self.request_tx.capacity();
316        }
317
318        response_rx.await.map_err(|_| PipelineError::Cancelled)?
319    }
320
321    /// Submit multiple requests concurrently.
322    pub async fn submit_batch(
323        &self,
324        requests: Vec<PipelineRequest>,
325    ) -> Vec<Result<PipelineResponse, PipelineError>> {
326        let mut results = Vec::with_capacity(requests.len());
327
328        for request in requests {
329            let result = self.submit(request).await;
330            results.push(result);
331        }
332
333        results
334    }
335
336    /// Get pipeline statistics.
337    pub async fn stats(&self) -> PipelineStats {
338        self.stats.read().await.clone()
339    }
340
341    /// Get pipeline configuration.
342    #[must_use]
343    pub fn config(&self) -> &PipelineConfig {
344        &self.config
345    }
346
347    /// Pipeline worker task that batches and processes requests.
348    async fn pipeline_worker(
349        config: PipelineConfig,
350        mut request_rx: mpsc::Receiver<PendingRequest>,
351        stats: Arc<RwLock<PipelineStats>>,
352    ) {
353        let mut batch: Vec<PendingRequest> = Vec::with_capacity(config.max_batch_size);
354        let mut last_batch_time = Instant::now();
355        let batch_timeout = Duration::from_millis(config.batch_timeout_ms);
356        let semaphore = Arc::new(Semaphore::new(config.max_concurrent));
357
358        // Deduplication map: operation+payload hash -> list of response channels
359        let mut dedup_map: HashMap<
360            String,
361            Vec<oneshot::Sender<Result<PipelineResponse, PipelineError>>>,
362        > = HashMap::new();
363
364        loop {
365            // Try to fill the batch
366            let timeout = tokio::time::sleep(batch_timeout);
367            tokio::pin!(timeout);
368
369            tokio::select! {
370                Some(pending) = request_rx.recv() => {
371                    // Check for timeout
372                    if pending.queued_at.elapsed() > Duration::from_millis(config.max_queue_time_ms) {
373                        let _ = pending.response_tx.send(Err(PipelineError::RequestTimeout(config.max_queue_time_ms)));
374                        continue;
375                    }
376
377                    // Check for deduplication
378                    if config.enable_deduplication {
379                        let dedup_key = format!("{}:{}", pending.request.operation, hex::encode(&pending.request.payload));
380                        if let Some(channels) = dedup_map.get_mut(&dedup_key) {
381                            channels.push(pending.response_tx);
382                            let mut stats = stats.write().await;
383                            stats.deduplicated_requests += 1;
384                            continue;
385                        } else {
386                            dedup_map.insert(dedup_key, vec![]);
387                        }
388                    }
389
390                    batch.push(pending);
391
392                    // Process batch if full
393                    if batch.len() >= config.max_batch_size {
394                        let current_batch = std::mem::replace(&mut batch, Vec::with_capacity(config.max_batch_size));
395                        Self::process_batch(
396                            current_batch,
397                            Arc::clone(&semaphore),
398                            Arc::clone(&stats),
399                        ).await;
400                        last_batch_time = Instant::now();
401                        dedup_map.clear();
402                    }
403                }
404                () = &mut timeout, if !batch.is_empty() => {
405                    // Process partial batch after timeout
406                    if last_batch_time.elapsed() >= batch_timeout && !batch.is_empty() {
407                        let current_batch = std::mem::replace(&mut batch, Vec::with_capacity(config.max_batch_size));
408                        Self::process_batch(
409                            current_batch,
410                            Arc::clone(&semaphore),
411                            Arc::clone(&stats),
412                        ).await;
413                        last_batch_time = Instant::now();
414                        dedup_map.clear();
415                    }
416                }
417                else => break,
418            }
419        }
420    }
421
422    /// Process a batch of requests.
423    async fn process_batch(
424        batch: Vec<PendingRequest>,
425        semaphore: Arc<Semaphore>,
426        stats: Arc<RwLock<PipelineStats>>,
427    ) {
428        let batch_size = batch.len();
429        let batch_start = Instant::now();
430
431        // Acquire semaphore permit for concurrency control
432        let _permit = semaphore.acquire().await.expect("Semaphore closed");
433
434        // Process each request in the batch
435        for pending in batch {
436            let start_time = Instant::now();
437
438            // Simulate request processing (in real implementation, this would call the actual handler)
439            let response = Self::execute_request(&pending.request).await;
440
441            let processing_time_ms = start_time.elapsed().as_millis() as u64;
442
443            let result = response.map(|mut resp| {
444                resp.processing_time_ms = processing_time_ms;
445                resp
446            });
447
448            // Update stats
449            {
450                let mut stats = stats.write().await;
451                match &result {
452                    Ok(_) => stats.successful_requests += 1,
453                    Err(_) => stats.failed_requests += 1,
454                }
455
456                // Update average latency
457                let total_latency = stats.avg_latency_ms
458                    * (stats.successful_requests + stats.failed_requests - 1) as f64;
459                stats.avg_latency_ms = (total_latency + processing_time_ms as f64)
460                    / (stats.successful_requests + stats.failed_requests) as f64;
461            }
462
463            // Send response
464            let _ = pending.response_tx.send(result);
465        }
466
467        // Update batch stats
468        {
469            let mut stats = stats.write().await;
470            stats.total_batches += 1;
471            let total_batch_size = stats.avg_batch_size * (stats.total_batches - 1) as f64;
472            stats.avg_batch_size =
473                (total_batch_size + batch_size as f64) / stats.total_batches as f64;
474        }
475
476        let _batch_duration = batch_start.elapsed();
477    }
478
479    /// Execute a single request (placeholder for actual implementation).
480    async fn execute_request(request: &PipelineRequest) -> Result<PipelineResponse, PipelineError> {
481        // This is a placeholder. In a real implementation, this would:
482        // 1. Call the appropriate handler based on request.operation
483        // 2. Process the request.payload
484        // 3. Return actual response data
485
486        // Simulate processing time
487        tokio::time::sleep(Duration::from_millis(10)).await;
488
489        Ok(PipelineResponse {
490            request_id: request.request_id.clone(),
491            success: true,
492            payload: vec![],
493            error: None,
494            processing_time_ms: 0, // Will be set by caller
495        })
496    }
497}
498
499/// Generate a unique request ID.
500fn generate_request_id() -> String {
501    use std::sync::atomic::{AtomicU64, Ordering};
502    static COUNTER: AtomicU64 = AtomicU64::new(0);
503    let id = COUNTER.fetch_add(1, Ordering::SeqCst);
504    format!("req-{}-{}", current_timestamp_ms(), id)
505}
506
507/// Get current timestamp in milliseconds.
508fn current_timestamp_ms() -> u64 {
509    use std::time::{SystemTime, UNIX_EPOCH};
510    SystemTime::now()
511        .duration_since(UNIX_EPOCH)
512        .unwrap_or_default()
513        .as_millis() as u64
514}
515
516#[cfg(test)]
517mod tests {
518    use super::*;
519
520    #[test]
521    fn test_pipeline_config_default() {
522        let config = PipelineConfig::default();
523        assert_eq!(config.max_batch_size, 50);
524        assert_eq!(config.max_concurrent, 10);
525        assert_eq!(config.batch_timeout_ms, DEFAULT_BATCH_TIMEOUT_MS);
526        assert!(config.enable_deduplication);
527    }
528
529    #[test]
530    fn test_request_priority_order() {
531        assert!(RequestPriority::Critical > RequestPriority::High);
532        assert!(RequestPriority::High > RequestPriority::Normal);
533        assert!(RequestPriority::Normal > RequestPriority::Low);
534    }
535
536    #[test]
537    fn test_pipeline_request_creation() {
538        let request = PipelineRequest::new("test_op", vec![1, 2, 3]);
539        assert_eq!(request.operation, "test_op");
540        assert_eq!(request.payload, vec![1, 2, 3]);
541        assert_eq!(request.priority, RequestPriority::Normal);
542    }
543
544    #[test]
545    fn test_pipeline_request_with_priority() {
546        let request = PipelineRequest::new("test_op", vec![]).with_priority(RequestPriority::High);
547        assert_eq!(request.priority, RequestPriority::High);
548    }
549
550    #[test]
551    fn test_pipeline_request_with_id() {
552        let request =
553            PipelineRequest::new("test_op", vec![]).with_request_id("custom-id".to_string());
554        assert_eq!(request.request_id, "custom-id");
555    }
556
557    #[tokio::test]
558    async fn test_pipeline_creation() {
559        let config = PipelineConfig::default();
560        let _pipeline = RequestPipeline::new(config);
561        // Pipeline created successfully
562    }
563
564    #[tokio::test]
565    async fn test_pipeline_submit_single_request() {
566        let config = PipelineConfig::default();
567        let pipeline = RequestPipeline::new(config);
568
569        let request = PipelineRequest::new("test", vec![1, 2, 3]);
570        let response = pipeline.submit(request).await;
571
572        assert!(response.is_ok());
573        let response = response.unwrap();
574        assert!(response.success);
575    }
576
577    #[tokio::test]
578    async fn test_pipeline_submit_batch() {
579        let config = PipelineConfig::default();
580        let pipeline = RequestPipeline::new(config);
581
582        let requests = vec![
583            PipelineRequest::new("test1", vec![1]),
584            PipelineRequest::new("test2", vec![2]),
585            PipelineRequest::new("test3", vec![3]),
586        ];
587
588        let responses = pipeline.submit_batch(requests).await;
589
590        assert_eq!(responses.len(), 3);
591        for response in responses {
592            assert!(response.is_ok());
593        }
594    }
595
596    #[tokio::test]
597    async fn test_pipeline_stats() {
598        let config = PipelineConfig::default();
599        let pipeline = RequestPipeline::new(config);
600
601        let request = PipelineRequest::new("test", vec![1, 2, 3]);
602        let _ = pipeline.submit(request).await;
603
604        // Give the worker time to process
605        tokio::time::sleep(Duration::from_millis(100)).await;
606
607        let stats = pipeline.stats().await;
608        assert_eq!(stats.total_requests, 1);
609    }
610
611    #[test]
612    fn test_generate_request_id_uniqueness() {
613        let id1 = generate_request_id();
614        let id2 = generate_request_id();
615        assert_ne!(id1, id2);
616    }
617
618    #[test]
619    fn test_request_priority_default() {
620        let priority = RequestPriority::default();
621        assert_eq!(priority, RequestPriority::Normal);
622    }
623}