ipfrs_interface/
streaming.rs

1//! Streaming Support for IPFRS Gateway
2//!
3//! Provides:
4//! - Memory-efficient streaming downloads
5//! - Chunked uploads with progress tracking
6//! - Server-Sent Events (SSE) for progress callbacks
7//! - Batch block operations
8
9use axum::{
10    body::Body,
11    extract::{Multipart, Path, Query, State},
12    http::{header, StatusCode},
13    response::{
14        sse::{Event, KeepAlive, Sse},
15        IntoResponse, Response,
16    },
17    Json,
18};
19use bytes::Bytes;
20use futures::stream::{self, Stream, StreamExt};
21use ipfrs_core::{Block, Cid};
22use ipfrs_storage::BlockStoreTrait;
23use serde::{Deserialize, Serialize};
24use std::convert::Infallible;
25use std::time::Duration;
26use tokio::sync::broadcast;
27use tracing::info;
28use uuid::Uuid;
29
30use crate::gateway::GatewayState;
31
32// ============================================================================
33// Flow Control & Concurrency
34// ============================================================================
35
36/// Concurrency control configuration for batch operations
37#[derive(Debug, Clone)]
38pub struct ConcurrencyConfig {
39    /// Maximum number of concurrent tasks for batch operations (0 = unlimited)
40    pub max_concurrent_tasks: usize,
41    /// Enable parallel processing
42    pub parallel_enabled: bool,
43}
44
45impl Default for ConcurrencyConfig {
46    fn default() -> Self {
47        Self {
48            max_concurrent_tasks: 100, // Reasonable default
49            parallel_enabled: true,
50        }
51    }
52}
53
54impl ConcurrencyConfig {
55    /// Create a conservative config (lower concurrency)
56    pub fn conservative() -> Self {
57        Self {
58            max_concurrent_tasks: 50,
59            parallel_enabled: true,
60        }
61    }
62
63    /// Create an aggressive config (higher concurrency)
64    pub fn aggressive() -> Self {
65        Self {
66            max_concurrent_tasks: 200,
67            parallel_enabled: true,
68        }
69    }
70
71    /// Create a sequential config (no parallelism)
72    pub fn sequential() -> Self {
73        Self {
74            max_concurrent_tasks: 1,
75            parallel_enabled: false,
76        }
77    }
78
79    /// Validate configuration
80    pub fn validate(&self) -> Result<(), String> {
81        if self.max_concurrent_tasks == 0 && self.parallel_enabled {
82            return Err(
83                "max_concurrent_tasks cannot be 0 when parallel_enabled is true".to_string(),
84            );
85        }
86        Ok(())
87    }
88}
89
90/// Flow control configuration for streaming operations
91#[derive(Debug, Clone)]
92pub struct FlowControlConfig {
93    /// Maximum bytes per second (0 = unlimited)
94    pub max_bytes_per_second: u64,
95    /// Initial window size in bytes
96    pub initial_window_size: usize,
97    /// Maximum window size in bytes
98    pub max_window_size: usize,
99    /// Minimum window size in bytes
100    pub min_window_size: usize,
101    /// Enable dynamic window adjustment
102    pub dynamic_adjustment: bool,
103}
104
105impl Default for FlowControlConfig {
106    fn default() -> Self {
107        Self {
108            max_bytes_per_second: 0,         // Unlimited
109            initial_window_size: 256 * 1024, // 256KB
110            max_window_size: 1024 * 1024,    // 1MB
111            min_window_size: 64 * 1024,      // 64KB
112            dynamic_adjustment: true,
113        }
114    }
115}
116
117impl FlowControlConfig {
118    /// Create a flow control config with specific rate limit
119    pub fn with_rate_limit(bytes_per_second: u64) -> Self {
120        Self {
121            max_bytes_per_second: bytes_per_second,
122            ..Default::default()
123        }
124    }
125
126    /// Create a conservative flow control config (smaller windows)
127    pub fn conservative() -> Self {
128        Self {
129            initial_window_size: 64 * 1024,
130            max_window_size: 256 * 1024,
131            min_window_size: 32 * 1024,
132            ..Default::default()
133        }
134    }
135
136    /// Create an aggressive flow control config (larger windows)
137    pub fn aggressive() -> Self {
138        Self {
139            initial_window_size: 512 * 1024,
140            max_window_size: 2 * 1024 * 1024,
141            min_window_size: 128 * 1024,
142            ..Default::default()
143        }
144    }
145
146    /// Validate configuration
147    pub fn validate(&self) -> Result<(), String> {
148        // Min window size must be less than or equal to initial window size
149        if self.min_window_size > self.initial_window_size {
150            return Err(format!(
151                "Minimum window size ({}) cannot exceed initial window size ({})",
152                self.min_window_size, self.initial_window_size
153            ));
154        }
155
156        // Initial window size must be less than or equal to max window size
157        if self.initial_window_size > self.max_window_size {
158            return Err(format!(
159                "Initial window size ({}) cannot exceed maximum window size ({})",
160                self.initial_window_size, self.max_window_size
161            ));
162        }
163
164        // Validate rate limit if set
165        if self.max_bytes_per_second > 0 {
166            validation::validate_rate_limit(self.max_bytes_per_second)?;
167        }
168
169        Ok(())
170    }
171}
172
173/// Flow control state for a streaming operation
174#[derive(Debug)]
175pub struct FlowController {
176    config: FlowControlConfig,
177    current_window_size: usize,
178    bytes_sent: u64,
179    start_time: std::time::Instant,
180    last_adjustment: std::time::Instant,
181}
182
183impl FlowController {
184    /// Create a new flow controller
185    pub fn new(config: FlowControlConfig) -> Self {
186        Self {
187            current_window_size: config.initial_window_size,
188            config,
189            bytes_sent: 0,
190            start_time: std::time::Instant::now(),
191            last_adjustment: std::time::Instant::now(),
192        }
193    }
194
195    /// Get the current window size
196    pub fn window_size(&self) -> usize {
197        self.current_window_size
198    }
199
200    /// Calculate delay needed to respect rate limit
201    pub fn calculate_delay(&self, bytes_to_send: usize) -> std::time::Duration {
202        if self.config.max_bytes_per_second == 0 {
203            return std::time::Duration::from_secs(0);
204        }
205
206        let elapsed = self.start_time.elapsed();
207        let elapsed_secs = elapsed.as_secs_f64();
208
209        if elapsed_secs == 0.0 {
210            return std::time::Duration::from_secs(0);
211        }
212
213        let current_rate = self.bytes_sent as f64 / elapsed_secs;
214        let target_rate = self.config.max_bytes_per_second as f64;
215
216        if current_rate + (bytes_to_send as f64 / elapsed_secs) > target_rate {
217            let delay_secs = (bytes_to_send as f64 / target_rate).max(0.0);
218            std::time::Duration::from_secs_f64(delay_secs)
219        } else {
220            std::time::Duration::from_secs(0)
221        }
222    }
223
224    /// Update flow control state after sending data
225    pub fn on_data_sent(&mut self, bytes: usize) {
226        self.bytes_sent += bytes as u64;
227
228        // Adjust window size if dynamic adjustment is enabled
229        if self.config.dynamic_adjustment {
230            self.adjust_window();
231        }
232    }
233
234    /// Dynamically adjust window size based on performance
235    fn adjust_window(&mut self) {
236        let elapsed = self.last_adjustment.elapsed();
237
238        // Adjust every 100ms
239        if elapsed < std::time::Duration::from_millis(100) {
240            return;
241        }
242
243        self.last_adjustment = std::time::Instant::now();
244
245        // Simple AIMD (Additive Increase Multiplicative Decrease) algorithm
246        // Increase window size by 10% if no issues
247        let new_size = (self.current_window_size as f64 * 1.1)
248            .min(self.config.max_window_size as f64) as usize;
249
250        self.current_window_size =
251            new_size.clamp(self.config.min_window_size, self.config.max_window_size);
252    }
253
254    /// Decrease window size (on congestion)
255    #[allow(dead_code)]
256    pub fn on_congestion(&mut self) {
257        // Multiplicative decrease by 50%
258        let new_size = self.current_window_size / 2;
259        self.current_window_size = new_size.max(self.config.min_window_size);
260        self.last_adjustment = std::time::Instant::now();
261    }
262
263    /// Get current throughput in bytes per second
264    pub fn current_throughput(&self) -> f64 {
265        let elapsed = self.start_time.elapsed().as_secs_f64();
266        if elapsed > 0.0 {
267            self.bytes_sent as f64 / elapsed
268        } else {
269            0.0
270        }
271    }
272}
273
274// ============================================================================
275// Resume/Cancel Support
276// ============================================================================
277
278/// Operation state for resume/cancel support
279#[derive(Debug, Clone)]
280pub struct OperationState {
281    /// Unique operation ID
282    pub operation_id: String,
283    /// Current byte offset
284    pub offset: u64,
285    /// Total size (if known)
286    pub total_size: Option<u64>,
287    /// Operation type
288    pub operation_type: OperationType,
289    /// Status
290    pub status: OperationStatus,
291}
292
293/// Operation type
294#[derive(Debug, Clone, Serialize, Deserialize)]
295#[serde(rename_all = "lowercase")]
296pub enum OperationType {
297    Upload,
298    Download,
299}
300
301/// Operation status
302#[derive(Debug, Clone, Serialize, Deserialize)]
303#[serde(rename_all = "lowercase")]
304pub enum OperationStatus {
305    InProgress,
306    Paused,
307    Cancelled,
308    Completed,
309    Failed,
310}
311
312/// Resume token for continuing operations
313#[derive(Debug, Clone, Serialize, Deserialize)]
314pub struct ResumeToken {
315    /// Operation ID
316    pub operation_id: String,
317    /// Byte offset to resume from
318    pub offset: u64,
319    /// Optional CID for downloads
320    pub cid: Option<String>,
321}
322
323impl ResumeToken {
324    /// Create a new resume token
325    pub fn new(operation_id: String, offset: u64, cid: Option<String>) -> Self {
326        Self {
327            operation_id,
328            offset,
329            cid,
330        }
331    }
332
333    /// Encode resume token to base64
334    pub fn encode(&self) -> Result<String, String> {
335        let json = serde_json::to_string(self).map_err(|e| e.to_string())?;
336        Ok(base64::Engine::encode(
337            &base64::engine::general_purpose::URL_SAFE_NO_PAD,
338            json.as_bytes(),
339        ))
340    }
341
342    /// Decode resume token from base64
343    pub fn decode(encoded: &str) -> Result<Self, String> {
344        let bytes =
345            base64::Engine::decode(&base64::engine::general_purpose::URL_SAFE_NO_PAD, encoded)
346                .map_err(|e| e.to_string())?;
347
348        let json = String::from_utf8(bytes).map_err(|e| e.to_string())?;
349        serde_json::from_str(&json).map_err(|e| e.to_string())
350    }
351}
352
353/// Cancel request
354#[derive(Debug, Deserialize)]
355pub struct CancelRequest {
356    /// Operation ID to cancel
357    pub operation_id: String,
358}
359
360/// Cancel response
361#[derive(Debug, Serialize)]
362pub struct CancelResponse {
363    /// Operation ID that was cancelled
364    pub operation_id: String,
365    /// Whether cancellation was successful
366    pub cancelled: bool,
367    /// Optional resume token for later resumption
368    pub resume_token: Option<String>,
369}
370
371// ============================================================================
372// Progress Tracking
373// ============================================================================
374
375/// Progress event for uploads/downloads
376#[derive(Debug, Clone, Serialize)]
377pub struct ProgressEvent {
378    /// Operation ID
379    pub operation_id: String,
380    /// Current bytes processed
381    pub bytes_processed: u64,
382    /// Total bytes (if known)
383    pub total_bytes: Option<u64>,
384    /// Progress percentage (0-100)
385    pub progress_percent: Option<f32>,
386    /// Status message
387    pub status: ProgressStatus,
388}
389
390/// Progress status
391#[derive(Debug, Clone, Serialize)]
392#[serde(rename_all = "lowercase")]
393pub enum ProgressStatus {
394    Started,
395    InProgress,
396    Completed,
397    Failed,
398}
399
400/// Progress tracker for streaming operations
401#[derive(Clone)]
402pub struct ProgressTracker {
403    sender: broadcast::Sender<ProgressEvent>,
404}
405
406impl ProgressTracker {
407    /// Create a new progress tracker
408    pub fn new() -> Self {
409        let (sender, _) = broadcast::channel(100);
410        Self { sender }
411    }
412
413    /// Send a progress update
414    pub fn send(&self, event: ProgressEvent) {
415        let _ = self.sender.send(event);
416    }
417
418    /// Subscribe to progress updates
419    pub fn subscribe(&self) -> broadcast::Receiver<ProgressEvent> {
420        self.sender.subscribe()
421    }
422}
423
424impl Default for ProgressTracker {
425    fn default() -> Self {
426        Self::new()
427    }
428}
429
430// ============================================================================
431// Streaming Downloads
432// ============================================================================
433
434/// Stream download query parameters
435#[derive(Debug, Deserialize)]
436pub struct StreamDownloadQuery {
437    /// Chunk size in bytes (default: dynamic based on flow control)
438    pub chunk_size: Option<usize>,
439    /// Maximum bytes per second (0 = unlimited)
440    pub max_bytes_per_second: Option<u64>,
441    /// Enable flow control
442    pub flow_control: Option<bool>,
443    /// Resume token for continuing a previous download
444    pub resume_token: Option<String>,
445    /// Byte offset to start from (alternative to resume_token)
446    pub offset: Option<u64>,
447}
448
449/// Stream content download endpoint
450///
451/// GET /v1/stream/download/{cid}
452///
453/// Streams content in chunks for memory-efficient downloads with optional flow control and resume support.
454pub async fn stream_download(
455    State(state): State<GatewayState>,
456    Path(cid_str): Path<String>,
457    Query(query): Query<StreamDownloadQuery>,
458) -> Result<Response, StreamingError> {
459    let cid: Cid = cid_str
460        .parse()
461        .map_err(|_| StreamingError::InvalidCid(cid_str.clone()))?;
462
463    // Get the block
464    let block = state
465        .store
466        .get(&cid)
467        .await
468        .map_err(|e| StreamingError::Storage(e.to_string()))?
469        .ok_or_else(|| StreamingError::NotFound(cid_str.clone()))?;
470
471    let data = block.data().to_vec();
472    let total_size = data.len();
473
474    // Determine start offset (from resume token or explicit offset)
475    let start_offset = if let Some(resume_token) = &query.resume_token {
476        let token = ResumeToken::decode(resume_token)
477            .map_err(|e| StreamingError::Upload(format!("Invalid resume token: {}", e)))?;
478
479        // Validate CID matches
480        if let Some(token_cid) = &token.cid {
481            if token_cid != &cid_str {
482                return Err(StreamingError::Upload(
483                    "Resume token CID mismatch".to_string(),
484                ));
485            }
486        }
487
488        token.offset as usize
489    } else {
490        query.offset.unwrap_or(0) as usize
491    };
492
493    // Validate offset
494    if start_offset >= total_size {
495        return Err(StreamingError::Upload(format!(
496            "Invalid offset: {} (total size: {})",
497            start_offset, total_size
498        )));
499    }
500
501    // Initialize flow control if requested
502    let enable_flow_control = query.flow_control.unwrap_or(false);
503    let flow_controller = if enable_flow_control {
504        let mut config = FlowControlConfig::default();
505        if let Some(rate) = query.max_bytes_per_second {
506            config.max_bytes_per_second = rate;
507        }
508        Some(FlowController::new(config))
509    } else {
510        None
511    };
512
513    // Determine chunk size (from query, flow control, or default)
514    let chunk_size = query.chunk_size.unwrap_or_else(|| {
515        flow_controller
516            .as_ref()
517            .map(|fc| fc.window_size())
518            .unwrap_or(64 * 1024)
519    });
520
521    // Pre-collect chunks starting from offset to avoid lifetime issues
522    let chunks: Vec<Vec<u8>> = data[start_offset..]
523        .chunks(chunk_size)
524        .map(|chunk| chunk.to_vec())
525        .collect();
526
527    let remaining_size = total_size - start_offset;
528
529    // Create a stream that yields chunks with flow control
530    let stream = if let Some(mut fc) = flow_controller {
531        let stream = async_stream::stream! {
532            for chunk in chunks {
533                let chunk_len = chunk.len();
534
535                // Apply flow control delay
536                let delay = fc.calculate_delay(chunk_len);
537                if !delay.is_zero() {
538                    tokio::time::sleep(delay).await;
539                }
540
541                // Update flow control state
542                fc.on_data_sent(chunk_len);
543
544                yield Ok::<_, Infallible>(Bytes::from(chunk));
545            }
546        };
547        Body::from_stream(stream)
548    } else {
549        let stream = stream::iter(chunks).map(|chunk| Ok::<_, Infallible>(Bytes::from(chunk)));
550        Body::from_stream(stream)
551    };
552
553    // Build response with appropriate headers
554    let mut response_builder = Response::builder();
555
556    // If resuming, use 206 Partial Content
557    if start_offset > 0 {
558        response_builder = response_builder.status(StatusCode::PARTIAL_CONTENT);
559        // Content-Range: bytes start-end/total
560        let end_offset = total_size - 1;
561        response_builder = response_builder.header(
562            header::CONTENT_RANGE,
563            format!("bytes {}-{}/{}", start_offset, end_offset, total_size),
564        );
565    } else {
566        response_builder = response_builder.status(StatusCode::OK);
567    }
568
569    Ok(response_builder
570        .header(header::CONTENT_TYPE, "application/octet-stream")
571        .header(header::CONTENT_LENGTH, remaining_size.to_string())
572        .header("X-Chunk-Size", chunk_size.to_string())
573        .header("Accept-Ranges", "bytes")
574        .body(stream)
575        .unwrap())
576}
577
578// ============================================================================
579// Streaming Uploads
580// ============================================================================
581
582/// Upload response
583#[derive(Debug, Serialize)]
584pub struct StreamUploadResponse {
585    pub cid: String,
586    pub size: u64,
587    pub chunks_received: usize,
588}
589
590/// Stream upload endpoint with progress tracking
591///
592/// POST /v1/stream/upload
593///
594/// Accepts chunked uploads and provides progress updates via SSE.
595pub async fn stream_upload(
596    State(state): State<GatewayState>,
597    mut multipart: Multipart,
598) -> Result<Json<StreamUploadResponse>, StreamingError> {
599    let mut total_data = Vec::new();
600    let mut chunks_received = 0;
601
602    // Process multipart data
603    while let Some(field) = multipart
604        .next_field()
605        .await
606        .map_err(|e| StreamingError::Upload(format!("Failed to read field: {}", e)))?
607    {
608        let data = field
609            .bytes()
610            .await
611            .map_err(|e| StreamingError::Upload(format!("Failed to read data: {}", e)))?;
612
613        total_data.extend_from_slice(&data);
614        chunks_received += 1;
615    }
616
617    if total_data.is_empty() {
618        return Err(StreamingError::Upload("No data received".to_string()));
619    }
620
621    // Create and store the block
622    let block = Block::new(Bytes::from(total_data))
623        .map_err(|e| StreamingError::Upload(format!("Failed to create block: {}", e)))?;
624
625    let cid = *block.cid();
626    let size = block.size();
627
628    state
629        .store
630        .put(&block)
631        .await
632        .map_err(|e| StreamingError::Storage(e.to_string()))?;
633
634    info!("Stream upload completed: {} ({} bytes)", cid, size);
635
636    Ok(Json(StreamUploadResponse {
637        cid: cid.to_string(),
638        size,
639        chunks_received,
640    }))
641}
642
643// ============================================================================
644// Batch Operations
645// ============================================================================
646
647/// Batch get request
648#[derive(Debug, Deserialize)]
649pub struct BatchGetRequest {
650    /// List of CIDs to retrieve
651    pub cids: Vec<String>,
652}
653
654/// Batch get response
655#[derive(Debug, Serialize)]
656pub struct BatchGetResponse {
657    /// Successfully retrieved blocks
658    pub blocks: Vec<BatchBlockResult>,
659    /// Failed CIDs
660    pub errors: Vec<BatchError>,
661}
662
663/// Individual block result in batch
664#[derive(Debug, Serialize)]
665pub struct BatchBlockResult {
666    pub cid: String,
667    pub data: String, // Base64 encoded
668    pub size: u64,
669}
670
671/// Batch error for individual items
672#[derive(Debug, Serialize)]
673pub struct BatchError {
674    pub cid: String,
675    pub error: String,
676}
677
678/// Batch get endpoint (optimized with parallel processing)
679///
680/// POST /v1/block/batch/get
681///
682/// Retrieves multiple blocks in a single request.
683/// Uses parallel processing for high throughput.
684pub async fn batch_get(
685    State(state): State<GatewayState>,
686    Json(req): Json<BatchGetRequest>,
687) -> Result<Json<BatchGetResponse>, StreamingError> {
688    // Validate batch size
689    validation::validate_batch_size(req.cids.len()).map_err(StreamingError::Validation)?;
690
691    // Process all CIDs in parallel using concurrent tasks
692    let tasks: Vec<_> = req
693        .cids
694        .into_iter()
695        .map(|cid_str| {
696            let state = state.clone();
697            tokio::spawn(async move {
698                match cid_str.parse::<Cid>() {
699                    Ok(cid) => match state.store.get(&cid).await {
700                        Ok(Some(block)) => {
701                            let data_base64 = base64::Engine::encode(
702                                &base64::engine::general_purpose::STANDARD,
703                                block.data(),
704                            );
705                            Ok(BatchBlockResult {
706                                cid: cid_str,
707                                data: data_base64,
708                                size: block.size(),
709                            })
710                        }
711                        Ok(None) => Err(BatchError {
712                            cid: cid_str,
713                            error: "Block not found".to_string(),
714                        }),
715                        Err(e) => Err(BatchError {
716                            cid: cid_str,
717                            error: e.to_string(),
718                        }),
719                    },
720                    Err(_) => Err(BatchError {
721                        cid: cid_str,
722                        error: "Invalid CID".to_string(),
723                    }),
724                }
725            })
726        })
727        .collect();
728
729    // Await all tasks and collect results
730    let mut blocks = Vec::new();
731    let mut errors = Vec::new();
732
733    for task in tasks {
734        match task.await {
735            Ok(Ok(block)) => blocks.push(block),
736            Ok(Err(error)) => errors.push(error),
737            Err(e) => {
738                // Task panicked or was cancelled
739                errors.push(BatchError {
740                    cid: "unknown".to_string(),
741                    error: format!("Task execution error: {}", e),
742                });
743            }
744        }
745    }
746
747    Ok(Json(BatchGetResponse { blocks, errors }))
748}
749
750/// Batch put request item
751#[derive(Debug, Deserialize)]
752pub struct BatchPutItem {
753    /// Base64 encoded data
754    pub data: String,
755}
756
757/// Transaction mode for batch operations
758#[derive(Debug, Clone, Copy, Deserialize, Serialize, PartialEq, Eq, Default)]
759#[serde(rename_all = "lowercase")]
760pub enum TransactionMode {
761    /// All-or-nothing: either all operations succeed or all fail
762    Atomic,
763    /// Best-effort: process each item independently
764    #[default]
765    BestEffort,
766}
767
768/// Batch put request
769#[derive(Debug, Deserialize)]
770pub struct BatchPutRequest {
771    /// List of blocks to store
772    pub blocks: Vec<BatchPutItem>,
773    /// Transaction mode (default: best_effort)
774    #[serde(default)]
775    pub transaction_mode: TransactionMode,
776}
777
778/// Batch put response
779#[derive(Debug, Serialize)]
780pub struct BatchPutResponse {
781    /// Successfully stored blocks
782    pub stored: Vec<BatchStoredResult>,
783    /// Failed items
784    pub errors: Vec<BatchPutError>,
785    /// Transaction ID (present in atomic mode)
786    pub transaction_id: Option<String>,
787    /// Transaction status
788    pub transaction_status: TransactionStatus,
789}
790
791/// Transaction status
792#[derive(Debug, Clone, Serialize)]
793#[serde(rename_all = "lowercase")]
794pub enum TransactionStatus {
795    /// All operations succeeded
796    Committed,
797    /// Some operations failed (best-effort mode)
798    PartialSuccess,
799    /// All operations rolled back (atomic mode)
800    RolledBack,
801}
802
803/// Stored block result
804#[derive(Debug, Serialize)]
805pub struct BatchStoredResult {
806    pub cid: String,
807    pub size: u64,
808    pub index: usize,
809}
810
811/// Batch put error
812#[derive(Debug, Serialize)]
813pub struct BatchPutError {
814    pub index: usize,
815    pub error: String,
816}
817
818/// Batch put endpoint
819///
820/// POST /v1/block/batch/put
821///
822/// Stores multiple blocks in a single request.
823/// Supports atomic transactions (all-or-nothing) and best-effort mode.
824pub async fn batch_put(
825    State(state): State<GatewayState>,
826    Json(req): Json<BatchPutRequest>,
827) -> Result<Json<BatchPutResponse>, StreamingError> {
828    let transaction_id = Uuid::new_v4().to_string();
829
830    match req.transaction_mode {
831        TransactionMode::Atomic => batch_put_atomic(state, req.blocks, transaction_id).await,
832        TransactionMode::BestEffort => {
833            batch_put_best_effort(state, req.blocks, transaction_id).await
834        }
835    }
836}
837
838/// Atomic batch put - all-or-nothing transaction
839async fn batch_put_atomic(
840    state: GatewayState,
841    items: Vec<BatchPutItem>,
842    transaction_id: String,
843) -> Result<Json<BatchPutResponse>, StreamingError> {
844    // Validate batch size
845    validation::validate_batch_size(items.len()).map_err(StreamingError::Validation)?;
846
847    // Phase 1: Validate all items and prepare blocks
848    let mut prepared_blocks = Vec::new();
849    let mut errors = Vec::new();
850
851    for (index, item) in items.into_iter().enumerate() {
852        match base64::Engine::decode(&base64::engine::general_purpose::STANDARD, &item.data) {
853            Ok(data) => match Block::new(Bytes::from(data)) {
854                Ok(block) => {
855                    prepared_blocks.push((index, block));
856                }
857                Err(e) => {
858                    errors.push(BatchPutError {
859                        index,
860                        error: format!("Block creation error: {}", e),
861                    });
862                }
863            },
864            Err(e) => {
865                errors.push(BatchPutError {
866                    index,
867                    error: format!("Base64 decode error: {}", e),
868                });
869            }
870        }
871    }
872
873    // If any validation failed, rollback (don't store anything)
874    if !errors.is_empty() {
875        info!(
876            "Atomic batch put [{}] rolled back: {} validation errors",
877            transaction_id,
878            errors.len()
879        );
880        return Ok(Json(BatchPutResponse {
881            stored: vec![],
882            errors,
883            transaction_id: Some(transaction_id),
884            transaction_status: TransactionStatus::RolledBack,
885        }));
886    }
887
888    // Phase 2: Store all blocks
889    let mut stored = Vec::new();
890    let mut stored_cids = Vec::new(); // Track for potential rollback
891
892    for (index, block) in prepared_blocks {
893        let cid = *block.cid();
894        let size = block.size();
895
896        match state.store.put(&block).await {
897            Ok(_) => {
898                stored_cids.push(cid);
899                stored.push(BatchStoredResult {
900                    cid: cid.to_string(),
901                    size,
902                    index,
903                });
904            }
905            Err(e) => {
906                // Storage failure - rollback all stored blocks
907                info!(
908                    "Atomic batch put [{}] rolling back: storage error at index {}",
909                    transaction_id, index
910                );
911
912                // Attempt to delete all previously stored blocks in this transaction
913                for stored_cid in stored_cids {
914                    let _ = state.store.delete(&stored_cid).await; // Best effort rollback
915                }
916
917                return Ok(Json(BatchPutResponse {
918                    stored: vec![],
919                    errors: vec![BatchPutError {
920                        index,
921                        error: format!("Storage error (transaction rolled back): {}", e),
922                    }],
923                    transaction_id: Some(transaction_id),
924                    transaction_status: TransactionStatus::RolledBack,
925                }));
926            }
927        }
928    }
929
930    info!(
931        "Atomic batch put [{}] committed: {} blocks stored",
932        transaction_id,
933        stored.len()
934    );
935
936    Ok(Json(BatchPutResponse {
937        stored,
938        errors: vec![],
939        transaction_id: Some(transaction_id),
940        transaction_status: TransactionStatus::Committed,
941    }))
942}
943
944/// Best-effort batch put - process each item independently
945async fn batch_put_best_effort(
946    state: GatewayState,
947    items: Vec<BatchPutItem>,
948    transaction_id: String,
949) -> Result<Json<BatchPutResponse>, StreamingError> {
950    // Validate batch size
951    validation::validate_batch_size(items.len()).map_err(StreamingError::Validation)?;
952
953    let mut stored = Vec::new();
954    let mut errors = Vec::new();
955
956    for (index, item) in items.into_iter().enumerate() {
957        match base64::Engine::decode(&base64::engine::general_purpose::STANDARD, &item.data) {
958            Ok(data) => match Block::new(Bytes::from(data)) {
959                Ok(block) => {
960                    let cid = *block.cid();
961                    let size = block.size();
962
963                    match state.store.put(&block).await {
964                        Ok(_) => {
965                            stored.push(BatchStoredResult {
966                                cid: cid.to_string(),
967                                size,
968                                index,
969                            });
970                        }
971                        Err(e) => {
972                            errors.push(BatchPutError {
973                                index,
974                                error: format!("Storage error: {}", e),
975                            });
976                        }
977                    }
978                }
979                Err(e) => {
980                    errors.push(BatchPutError {
981                        index,
982                        error: format!("Block creation error: {}", e),
983                    });
984                }
985            },
986            Err(e) => {
987                errors.push(BatchPutError {
988                    index,
989                    error: format!("Base64 decode error: {}", e),
990                });
991            }
992        }
993    }
994
995    let status = if errors.is_empty() {
996        TransactionStatus::Committed
997    } else {
998        TransactionStatus::PartialSuccess
999    };
1000
1001    info!(
1002        "Best-effort batch put [{}] completed: {} stored, {} errors",
1003        transaction_id,
1004        stored.len(),
1005        errors.len()
1006    );
1007
1008    Ok(Json(BatchPutResponse {
1009        stored,
1010        errors,
1011        transaction_id: Some(transaction_id),
1012        transaction_status: status,
1013    }))
1014}
1015
1016/// Batch has request
1017#[derive(Debug, Deserialize)]
1018pub struct BatchHasRequest {
1019    /// List of CIDs to check
1020    pub cids: Vec<String>,
1021}
1022
1023/// Batch has response
1024#[derive(Debug, Serialize)]
1025pub struct BatchHasResponse {
1026    /// Results for each CID
1027    pub results: Vec<BatchHasResult>,
1028}
1029
1030/// Individual has result
1031#[derive(Debug, Serialize)]
1032pub struct BatchHasResult {
1033    pub cid: String,
1034    pub exists: bool,
1035}
1036
1037/// Batch has endpoint (optimized with parallel processing)
1038///
1039/// POST /v1/block/batch/has
1040///
1041/// Checks if multiple blocks exist in a single request.
1042/// Uses parallel processing for high throughput.
1043pub async fn batch_has(
1044    State(state): State<GatewayState>,
1045    Json(req): Json<BatchHasRequest>,
1046) -> Result<Json<BatchHasResponse>, StreamingError> {
1047    // Validate batch size
1048    validation::validate_batch_size(req.cids.len()).map_err(StreamingError::Validation)?;
1049
1050    // Process all CIDs in parallel using concurrent tasks
1051    let tasks: Vec<_> = req
1052        .cids
1053        .into_iter()
1054        .map(|cid_str| {
1055            let state = state.clone();
1056            tokio::spawn(async move {
1057                let exists = if let Ok(cid) = cid_str.parse::<Cid>() {
1058                    state.store.has(&cid).await.unwrap_or(false)
1059                } else {
1060                    false
1061                };
1062
1063                BatchHasResult {
1064                    cid: cid_str,
1065                    exists,
1066                }
1067            })
1068        })
1069        .collect();
1070
1071    // Await all tasks and collect results
1072    let mut results = Vec::new();
1073
1074    for task in tasks {
1075        match task.await {
1076            Ok(result) => results.push(result),
1077            Err(e) => {
1078                // Task panicked or was cancelled - mark as non-existent
1079                results.push(BatchHasResult {
1080                    cid: format!("task_error_{}", e),
1081                    exists: false,
1082                });
1083            }
1084        }
1085    }
1086
1087    Ok(Json(BatchHasResponse { results }))
1088}
1089
1090// ============================================================================
1091// Server-Sent Events (SSE) for Progress
1092// ============================================================================
1093
1094/// SSE progress stream endpoint
1095///
1096/// GET /v1/progress/{operation_id}
1097///
1098/// Provides real-time progress updates via Server-Sent Events.
1099pub async fn progress_stream(
1100    State(_state): State<GatewayState>,
1101    Path(operation_id): Path<String>,
1102) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
1103    // Create a progress tracker for this operation
1104    let tracker = ProgressTracker::new();
1105    let mut receiver = tracker.subscribe();
1106
1107    // Create the SSE stream
1108    let stream = async_stream::stream! {
1109        // Send initial event
1110        let initial = ProgressEvent {
1111            operation_id: operation_id.clone(),
1112            bytes_processed: 0,
1113            total_bytes: None,
1114            progress_percent: Some(0.0),
1115            status: ProgressStatus::Started,
1116        };
1117        yield Ok(Event::default()
1118            .event("progress")
1119            .json_data(initial)
1120            .unwrap());
1121
1122        // Stream progress updates
1123        loop {
1124            match tokio::time::timeout(Duration::from_secs(30), receiver.recv()).await {
1125                Ok(Ok(event)) => {
1126                    let is_complete = matches!(event.status, ProgressStatus::Completed | ProgressStatus::Failed);
1127                    yield Ok(Event::default()
1128                        .event("progress")
1129                        .json_data(event)
1130                        .unwrap());
1131
1132                    if is_complete {
1133                        break;
1134                    }
1135                }
1136                Ok(Err(_)) => {
1137                    // Channel closed
1138                    break;
1139                }
1140                Err(_) => {
1141                    // Timeout - send keepalive
1142                    yield Ok(Event::default().comment("keepalive"));
1143                }
1144            }
1145        }
1146    };
1147
1148    Sse::new(stream).keep_alive(KeepAlive::default())
1149}
1150
1151// ============================================================================
1152// Validation Utilities
1153// ============================================================================
1154
1155/// Request validation utilities
1156pub mod validation {
1157
1158    /// Validate CID string format
1159    pub fn validate_cid(cid: &str) -> Result<(), String> {
1160        if cid.is_empty() {
1161            return Err("CID cannot be empty".to_string());
1162        }
1163
1164        // Basic CID validation (could be more comprehensive)
1165        if !cid.starts_with("Qm") && !cid.starts_with("bafy") && !cid.starts_with("baf") {
1166            return Err(format!("Invalid CID format: {}", cid));
1167        }
1168
1169        if cid.len() < 10 {
1170            return Err(format!("CID too short: {}", cid));
1171        }
1172
1173        Ok(())
1174    }
1175
1176    /// Validate byte offset and size
1177    pub fn validate_offset(offset: u64, total_size: usize) -> Result<(), String> {
1178        if offset as usize >= total_size {
1179            return Err(format!(
1180                "Offset {} exceeds total size {}",
1181                offset, total_size
1182            ));
1183        }
1184        Ok(())
1185    }
1186
1187    /// Validate chunk size (reasonable limits)
1188    pub fn validate_chunk_size(chunk_size: usize) -> Result<(), String> {
1189        const MIN_CHUNK_SIZE: usize = 1024; // 1KB
1190        const MAX_CHUNK_SIZE: usize = 10 * 1024 * 1024; // 10MB
1191
1192        if chunk_size < MIN_CHUNK_SIZE {
1193            return Err(format!(
1194                "Chunk size {} is too small (minimum: {})",
1195                chunk_size, MIN_CHUNK_SIZE
1196            ));
1197        }
1198
1199        if chunk_size > MAX_CHUNK_SIZE {
1200            return Err(format!(
1201                "Chunk size {} is too large (maximum: {})",
1202                chunk_size, MAX_CHUNK_SIZE
1203            ));
1204        }
1205
1206        Ok(())
1207    }
1208
1209    /// Validate rate limit
1210    pub fn validate_rate_limit(bytes_per_second: u64) -> Result<(), String> {
1211        const MAX_RATE: u64 = 10 * 1024 * 1024 * 1024; // 10 GB/s
1212
1213        if bytes_per_second > MAX_RATE {
1214            return Err(format!(
1215                "Rate limit {} exceeds maximum {}",
1216                bytes_per_second, MAX_RATE
1217            ));
1218        }
1219
1220        Ok(())
1221    }
1222
1223    /// Validate batch size
1224    pub fn validate_batch_size(count: usize) -> Result<(), String> {
1225        const MAX_BATCH_SIZE: usize = 1000;
1226
1227        if count == 0 {
1228            return Err("Batch cannot be empty".to_string());
1229        }
1230
1231        if count > MAX_BATCH_SIZE {
1232            return Err(format!(
1233                "Batch size {} exceeds maximum {}",
1234                count, MAX_BATCH_SIZE
1235            ));
1236        }
1237
1238        Ok(())
1239    }
1240}
1241
1242// ============================================================================
1243// Error Types
1244// ============================================================================
1245
1246/// Streaming operation errors
1247#[derive(Debug)]
1248pub enum StreamingError {
1249    InvalidCid(String),
1250    NotFound(String),
1251    Upload(String),
1252    Storage(String),
1253    Validation(String),
1254}
1255
1256impl IntoResponse for StreamingError {
1257    fn into_response(self) -> Response {
1258        let (status, message) = match self {
1259            StreamingError::InvalidCid(cid) => {
1260                (StatusCode::BAD_REQUEST, format!("Invalid CID: {}", cid))
1261            }
1262            StreamingError::NotFound(cid) => {
1263                (StatusCode::NOT_FOUND, format!("Block not found: {}", cid))
1264            }
1265            StreamingError::Upload(msg) => {
1266                (StatusCode::BAD_REQUEST, format!("Upload error: {}", msg))
1267            }
1268            StreamingError::Storage(msg) => (
1269                StatusCode::INTERNAL_SERVER_ERROR,
1270                format!("Storage error: {}", msg),
1271            ),
1272            StreamingError::Validation(msg) => (
1273                StatusCode::BAD_REQUEST,
1274                format!("Validation error: {}", msg),
1275            ),
1276        };
1277
1278        (status, message).into_response()
1279    }
1280}
1281
1282#[cfg(test)]
1283mod tests {
1284    use super::*;
1285
1286    #[test]
1287    fn test_progress_event_serialization() {
1288        let event = ProgressEvent {
1289            operation_id: "test-123".to_string(),
1290            bytes_processed: 1024,
1291            total_bytes: Some(2048),
1292            progress_percent: Some(50.0),
1293            status: ProgressStatus::InProgress,
1294        };
1295
1296        let json = serde_json::to_string(&event).unwrap();
1297        assert!(json.contains("test-123"));
1298        assert!(json.contains("1024"));
1299        assert!(json.contains("inprogress"));
1300    }
1301
1302    #[test]
1303    fn test_progress_tracker() {
1304        let tracker = ProgressTracker::new();
1305        let _receiver = tracker.subscribe();
1306
1307        let event = ProgressEvent {
1308            operation_id: "test".to_string(),
1309            bytes_processed: 100,
1310            total_bytes: Some(200),
1311            progress_percent: Some(50.0),
1312            status: ProgressStatus::InProgress,
1313        };
1314
1315        tracker.send(event);
1316
1317        // Note: In async context, we would await the receiver
1318        // This test just verifies the tracker can be created and used
1319    }
1320
1321    #[test]
1322    fn test_batch_request_deserialization() {
1323        let json = r#"{"cids": ["QmTest1", "QmTest2"]}"#;
1324        let req: BatchGetRequest = serde_json::from_str(json).unwrap();
1325        assert_eq!(req.cids.len(), 2);
1326        assert_eq!(req.cids[0], "QmTest1");
1327    }
1328
1329    #[test]
1330    fn test_batch_put_request_deserialization() {
1331        let json = r#"{"blocks": [{"data": "SGVsbG8="}]}"#;
1332        let req: BatchPutRequest = serde_json::from_str(json).unwrap();
1333        assert_eq!(req.blocks.len(), 1);
1334        assert_eq!(req.blocks[0].data, "SGVsbG8=");
1335        assert_eq!(req.transaction_mode, TransactionMode::BestEffort); // Default
1336    }
1337
1338    #[test]
1339    fn test_batch_put_request_atomic_mode() {
1340        let json = r#"{"blocks": [{"data": "SGVsbG8="}], "transaction_mode": "atomic"}"#;
1341        let req: BatchPutRequest = serde_json::from_str(json).unwrap();
1342        assert_eq!(req.transaction_mode, TransactionMode::Atomic);
1343    }
1344
1345    #[test]
1346    fn test_transaction_mode_default() {
1347        let mode = TransactionMode::default();
1348        assert_eq!(mode, TransactionMode::BestEffort);
1349    }
1350
1351    #[test]
1352    fn test_transaction_status_serialization() {
1353        let status = TransactionStatus::Committed;
1354        let json = serde_json::to_string(&status).unwrap();
1355        assert_eq!(json, r#""committed""#);
1356
1357        let status = TransactionStatus::RolledBack;
1358        let json = serde_json::to_string(&status).unwrap();
1359        assert_eq!(json, r#""rolledback""#);
1360    }
1361
1362    #[test]
1363    fn test_batch_put_response_with_transaction() {
1364        let response = BatchPutResponse {
1365            stored: vec![],
1366            errors: vec![],
1367            transaction_id: Some("test-txn-123".to_string()),
1368            transaction_status: TransactionStatus::Committed,
1369        };
1370
1371        let json = serde_json::to_string(&response).unwrap();
1372        assert!(json.contains("test-txn-123"));
1373        assert!(json.contains("committed"));
1374    }
1375
1376    #[test]
1377    fn test_flow_control_config_default() {
1378        let config = FlowControlConfig::default();
1379        assert_eq!(config.max_bytes_per_second, 0);
1380        assert_eq!(config.initial_window_size, 256 * 1024);
1381        assert_eq!(config.max_window_size, 1024 * 1024);
1382        assert_eq!(config.min_window_size, 64 * 1024);
1383        assert!(config.dynamic_adjustment);
1384    }
1385
1386    #[test]
1387    fn test_flow_control_config_with_rate_limit() {
1388        let config = FlowControlConfig::with_rate_limit(1_000_000); // 1 MB/s
1389        assert_eq!(config.max_bytes_per_second, 1_000_000);
1390        assert!(config.dynamic_adjustment);
1391    }
1392
1393    #[test]
1394    fn test_flow_control_config_conservative() {
1395        let config = FlowControlConfig::conservative();
1396        assert_eq!(config.initial_window_size, 64 * 1024);
1397        assert_eq!(config.max_window_size, 256 * 1024);
1398        assert_eq!(config.min_window_size, 32 * 1024);
1399    }
1400
1401    #[test]
1402    fn test_flow_control_config_aggressive() {
1403        let config = FlowControlConfig::aggressive();
1404        assert_eq!(config.initial_window_size, 512 * 1024);
1405        assert_eq!(config.max_window_size, 2 * 1024 * 1024);
1406        assert_eq!(config.min_window_size, 128 * 1024);
1407    }
1408
1409    #[test]
1410    fn test_flow_controller_window_size() {
1411        let config = FlowControlConfig::default();
1412        let controller = FlowController::new(config.clone());
1413        assert_eq!(controller.window_size(), config.initial_window_size);
1414    }
1415
1416    #[test]
1417    fn test_flow_controller_no_rate_limit() {
1418        let config = FlowControlConfig::default(); // No rate limit (0)
1419        let controller = FlowController::new(config);
1420
1421        // Should not delay when no rate limit
1422        let delay = controller.calculate_delay(1024);
1423        assert_eq!(delay, std::time::Duration::from_secs(0));
1424    }
1425
1426    #[test]
1427    fn test_flow_controller_on_data_sent() {
1428        let config = FlowControlConfig::default();
1429        let mut controller = FlowController::new(config);
1430
1431        controller.on_data_sent(1024);
1432        assert_eq!(controller.bytes_sent, 1024);
1433
1434        controller.on_data_sent(2048);
1435        assert_eq!(controller.bytes_sent, 3072);
1436    }
1437
1438    #[test]
1439    fn test_flow_controller_on_congestion() {
1440        let config = FlowControlConfig::default();
1441        let mut controller = FlowController::new(config.clone());
1442
1443        let initial_window = controller.window_size();
1444        controller.on_congestion();
1445
1446        // Window should decrease by 50%
1447        assert!(controller.window_size() < initial_window);
1448        assert!(controller.window_size() >= config.min_window_size);
1449    }
1450
1451    #[test]
1452    fn test_flow_controller_throughput() {
1453        let config = FlowControlConfig::default();
1454        let mut controller = FlowController::new(config);
1455
1456        // Send some data
1457        controller.on_data_sent(1024);
1458
1459        // Throughput should be non-negative
1460        let throughput = controller.current_throughput();
1461        assert!(throughput >= 0.0);
1462    }
1463
1464    #[test]
1465    fn test_resume_token_encode_decode() {
1466        let token = ResumeToken::new("op-123".to_string(), 4096, Some("QmTest123".to_string()));
1467
1468        // Encode
1469        let encoded = token.encode().unwrap();
1470        assert!(!encoded.is_empty());
1471
1472        // Decode
1473        let decoded = ResumeToken::decode(&encoded).unwrap();
1474        assert_eq!(decoded.operation_id, "op-123");
1475        assert_eq!(decoded.offset, 4096);
1476        assert_eq!(decoded.cid, Some("QmTest123".to_string()));
1477    }
1478
1479    #[test]
1480    fn test_resume_token_invalid() {
1481        // Invalid base64
1482        let result = ResumeToken::decode("invalid!!!base64");
1483        assert!(result.is_err());
1484
1485        // Valid base64 but invalid JSON
1486        let invalid_json = base64::Engine::encode(
1487            &base64::engine::general_purpose::URL_SAFE_NO_PAD,
1488            b"not json",
1489        );
1490        let result = ResumeToken::decode(&invalid_json);
1491        assert!(result.is_err());
1492    }
1493
1494    #[test]
1495    fn test_operation_type_serialization() {
1496        let upload = OperationType::Upload;
1497        let json = serde_json::to_string(&upload).unwrap();
1498        assert_eq!(json, r#""upload""#);
1499
1500        let download = OperationType::Download;
1501        let json = serde_json::to_string(&download).unwrap();
1502        assert_eq!(json, r#""download""#);
1503    }
1504
1505    #[test]
1506    fn test_operation_status_serialization() {
1507        let status = OperationStatus::InProgress;
1508        let json = serde_json::to_string(&status).unwrap();
1509        assert_eq!(json, r#""inprogress""#);
1510
1511        let status = OperationStatus::Cancelled;
1512        let json = serde_json::to_string(&status).unwrap();
1513        assert_eq!(json, r#""cancelled""#);
1514    }
1515
1516    #[test]
1517    fn test_cancel_response_serialization() {
1518        let response = CancelResponse {
1519            operation_id: "op-456".to_string(),
1520            cancelled: true,
1521            resume_token: Some("token123".to_string()),
1522        };
1523
1524        let json = serde_json::to_string(&response).unwrap();
1525        assert!(json.contains("op-456"));
1526        assert!(json.contains("true"));
1527        assert!(json.contains("token123"));
1528    }
1529
1530    // Validation tests
1531    #[test]
1532    fn test_validate_cid_valid() {
1533        assert!(validation::validate_cid("QmTest123456").is_ok());
1534        assert!(validation::validate_cid(
1535            "bafybeigdyrzt5sfp7udm7hu76uh7y26nf3efuylqabf3oclgtqy55fbzdi"
1536        )
1537        .is_ok());
1538        assert!(validation::validate_cid(
1539            "bafkreigh2akiscaildcqabsyg3dfr6chu3fgpregiymsck7e7aqa4s52zy"
1540        )
1541        .is_ok());
1542    }
1543
1544    #[test]
1545    fn test_validate_cid_invalid() {
1546        assert!(validation::validate_cid("").is_err());
1547        assert!(validation::validate_cid("invalid").is_err());
1548        assert!(validation::validate_cid("Qm").is_err());
1549    }
1550
1551    #[test]
1552    fn test_validate_offset_valid() {
1553        assert!(validation::validate_offset(0, 1000).is_ok());
1554        assert!(validation::validate_offset(500, 1000).is_ok());
1555        assert!(validation::validate_offset(999, 1000).is_ok());
1556    }
1557
1558    #[test]
1559    fn test_validate_offset_invalid() {
1560        assert!(validation::validate_offset(1000, 1000).is_err());
1561        assert!(validation::validate_offset(2000, 1000).is_err());
1562    }
1563
1564    #[test]
1565    fn test_validate_chunk_size_valid() {
1566        assert!(validation::validate_chunk_size(1024).is_ok()); // Minimum
1567        assert!(validation::validate_chunk_size(64 * 1024).is_ok()); // 64KB
1568        assert!(validation::validate_chunk_size(10 * 1024 * 1024).is_ok()); // Maximum
1569    }
1570
1571    #[test]
1572    fn test_validate_chunk_size_invalid() {
1573        assert!(validation::validate_chunk_size(512).is_err()); // Too small
1574        assert!(validation::validate_chunk_size(20 * 1024 * 1024).is_err()); // Too large
1575    }
1576
1577    #[test]
1578    fn test_validate_rate_limit_valid() {
1579        assert!(validation::validate_rate_limit(0).is_ok()); // Unlimited
1580        assert!(validation::validate_rate_limit(1_000_000).is_ok()); // 1 MB/s
1581        assert!(validation::validate_rate_limit(10 * 1024 * 1024 * 1024).is_ok());
1582        // Maximum
1583    }
1584
1585    #[test]
1586    fn test_validate_rate_limit_invalid() {
1587        assert!(validation::validate_rate_limit(20 * 1024 * 1024 * 1024).is_err());
1588        // Too large
1589    }
1590
1591    #[test]
1592    fn test_validate_batch_size_valid() {
1593        assert!(validation::validate_batch_size(1).is_ok());
1594        assert!(validation::validate_batch_size(100).is_ok());
1595        assert!(validation::validate_batch_size(1000).is_ok()); // Maximum
1596    }
1597
1598    #[test]
1599    fn test_validate_batch_size_invalid() {
1600        assert!(validation::validate_batch_size(0).is_err()); // Empty
1601        assert!(validation::validate_batch_size(1001).is_err()); // Too large
1602        assert!(validation::validate_batch_size(5000).is_err()); // Way too large
1603    }
1604
1605    #[test]
1606    fn test_flow_control_config_validation_valid() {
1607        let config = FlowControlConfig::default();
1608        assert!(config.validate().is_ok());
1609
1610        let config = FlowControlConfig::conservative();
1611        assert!(config.validate().is_ok());
1612
1613        let config = FlowControlConfig::aggressive();
1614        assert!(config.validate().is_ok());
1615    }
1616
1617    #[test]
1618    fn test_flow_control_config_validation_invalid() {
1619        // Min window size exceeds initial window size
1620        let config = FlowControlConfig {
1621            max_bytes_per_second: 0,
1622            initial_window_size: 64 * 1024,
1623            max_window_size: 1024 * 1024,
1624            min_window_size: 128 * 1024, // Exceeds initial
1625            dynamic_adjustment: true,
1626        };
1627        assert!(config.validate().is_err());
1628
1629        // Initial window size exceeds max window size
1630        let config = FlowControlConfig {
1631            max_bytes_per_second: 0,
1632            initial_window_size: 2 * 1024 * 1024,
1633            max_window_size: 1024 * 1024, // Less than initial
1634            min_window_size: 64 * 1024,
1635            dynamic_adjustment: true,
1636        };
1637        assert!(config.validate().is_err());
1638
1639        // Rate limit too high
1640        let config = FlowControlConfig {
1641            max_bytes_per_second: 20 * 1024 * 1024 * 1024, // Too high
1642            initial_window_size: 256 * 1024,
1643            max_window_size: 1024 * 1024,
1644            min_window_size: 64 * 1024,
1645            dynamic_adjustment: true,
1646        };
1647        assert!(config.validate().is_err());
1648    }
1649
1650    #[test]
1651    fn test_concurrency_config_default() {
1652        let config = ConcurrencyConfig::default();
1653        assert_eq!(config.max_concurrent_tasks, 100);
1654        assert!(config.parallel_enabled);
1655        assert!(config.validate().is_ok());
1656    }
1657
1658    #[test]
1659    fn test_concurrency_config_conservative() {
1660        let config = ConcurrencyConfig::conservative();
1661        assert_eq!(config.max_concurrent_tasks, 50);
1662        assert!(config.parallel_enabled);
1663        assert!(config.validate().is_ok());
1664    }
1665
1666    #[test]
1667    fn test_concurrency_config_aggressive() {
1668        let config = ConcurrencyConfig::aggressive();
1669        assert_eq!(config.max_concurrent_tasks, 200);
1670        assert!(config.parallel_enabled);
1671        assert!(config.validate().is_ok());
1672    }
1673
1674    #[test]
1675    fn test_concurrency_config_sequential() {
1676        let config = ConcurrencyConfig::sequential();
1677        assert_eq!(config.max_concurrent_tasks, 1);
1678        assert!(!config.parallel_enabled);
1679        assert!(config.validate().is_ok());
1680    }
1681
1682    #[test]
1683    fn test_concurrency_config_validation_invalid() {
1684        let config = ConcurrencyConfig {
1685            max_concurrent_tasks: 0,
1686            parallel_enabled: true,
1687        };
1688        assert!(config.validate().is_err());
1689    }
1690
1691    #[test]
1692    fn test_concurrency_config_validation_valid() {
1693        let config = ConcurrencyConfig {
1694            max_concurrent_tasks: 0,
1695            parallel_enabled: false,
1696        };
1697        assert!(config.validate().is_ok());
1698
1699        let config = ConcurrencyConfig {
1700            max_concurrent_tasks: 100,
1701            parallel_enabled: true,
1702        };
1703        assert!(config.validate().is_ok());
1704    }
1705}