1use axum::{
10 body::Body,
11 extract::{Multipart, Path, Query, State},
12 http::{header, StatusCode},
13 response::{
14 sse::{Event, KeepAlive, Sse},
15 IntoResponse, Response,
16 },
17 Json,
18};
19use bytes::Bytes;
20use futures::stream::{self, Stream, StreamExt};
21use ipfrs_core::{Block, Cid};
22use ipfrs_storage::BlockStoreTrait;
23use serde::{Deserialize, Serialize};
24use std::convert::Infallible;
25use std::time::Duration;
26use tokio::sync::broadcast;
27use tracing::info;
28use uuid::Uuid;
29
30use crate::gateway::GatewayState;
31
32#[derive(Debug, Clone)]
38pub struct ConcurrencyConfig {
39 pub max_concurrent_tasks: usize,
41 pub parallel_enabled: bool,
43}
44
45impl Default for ConcurrencyConfig {
46 fn default() -> Self {
47 Self {
48 max_concurrent_tasks: 100, parallel_enabled: true,
50 }
51 }
52}
53
54impl ConcurrencyConfig {
55 pub fn conservative() -> Self {
57 Self {
58 max_concurrent_tasks: 50,
59 parallel_enabled: true,
60 }
61 }
62
63 pub fn aggressive() -> Self {
65 Self {
66 max_concurrent_tasks: 200,
67 parallel_enabled: true,
68 }
69 }
70
71 pub fn sequential() -> Self {
73 Self {
74 max_concurrent_tasks: 1,
75 parallel_enabled: false,
76 }
77 }
78
79 pub fn validate(&self) -> Result<(), String> {
81 if self.max_concurrent_tasks == 0 && self.parallel_enabled {
82 return Err(
83 "max_concurrent_tasks cannot be 0 when parallel_enabled is true".to_string(),
84 );
85 }
86 Ok(())
87 }
88}
89
90#[derive(Debug, Clone)]
92pub struct FlowControlConfig {
93 pub max_bytes_per_second: u64,
95 pub initial_window_size: usize,
97 pub max_window_size: usize,
99 pub min_window_size: usize,
101 pub dynamic_adjustment: bool,
103}
104
105impl Default for FlowControlConfig {
106 fn default() -> Self {
107 Self {
108 max_bytes_per_second: 0, initial_window_size: 256 * 1024, max_window_size: 1024 * 1024, min_window_size: 64 * 1024, dynamic_adjustment: true,
113 }
114 }
115}
116
117impl FlowControlConfig {
118 pub fn with_rate_limit(bytes_per_second: u64) -> Self {
120 Self {
121 max_bytes_per_second: bytes_per_second,
122 ..Default::default()
123 }
124 }
125
126 pub fn conservative() -> Self {
128 Self {
129 initial_window_size: 64 * 1024,
130 max_window_size: 256 * 1024,
131 min_window_size: 32 * 1024,
132 ..Default::default()
133 }
134 }
135
136 pub fn aggressive() -> Self {
138 Self {
139 initial_window_size: 512 * 1024,
140 max_window_size: 2 * 1024 * 1024,
141 min_window_size: 128 * 1024,
142 ..Default::default()
143 }
144 }
145
146 pub fn validate(&self) -> Result<(), String> {
148 if self.min_window_size > self.initial_window_size {
150 return Err(format!(
151 "Minimum window size ({}) cannot exceed initial window size ({})",
152 self.min_window_size, self.initial_window_size
153 ));
154 }
155
156 if self.initial_window_size > self.max_window_size {
158 return Err(format!(
159 "Initial window size ({}) cannot exceed maximum window size ({})",
160 self.initial_window_size, self.max_window_size
161 ));
162 }
163
164 if self.max_bytes_per_second > 0 {
166 validation::validate_rate_limit(self.max_bytes_per_second)?;
167 }
168
169 Ok(())
170 }
171}
172
173#[derive(Debug)]
175pub struct FlowController {
176 config: FlowControlConfig,
177 current_window_size: usize,
178 bytes_sent: u64,
179 start_time: std::time::Instant,
180 last_adjustment: std::time::Instant,
181}
182
183impl FlowController {
184 pub fn new(config: FlowControlConfig) -> Self {
186 Self {
187 current_window_size: config.initial_window_size,
188 config,
189 bytes_sent: 0,
190 start_time: std::time::Instant::now(),
191 last_adjustment: std::time::Instant::now(),
192 }
193 }
194
195 pub fn window_size(&self) -> usize {
197 self.current_window_size
198 }
199
200 pub fn calculate_delay(&self, bytes_to_send: usize) -> std::time::Duration {
202 if self.config.max_bytes_per_second == 0 {
203 return std::time::Duration::from_secs(0);
204 }
205
206 let elapsed = self.start_time.elapsed();
207 let elapsed_secs = elapsed.as_secs_f64();
208
209 if elapsed_secs == 0.0 {
210 return std::time::Duration::from_secs(0);
211 }
212
213 let current_rate = self.bytes_sent as f64 / elapsed_secs;
214 let target_rate = self.config.max_bytes_per_second as f64;
215
216 if current_rate + (bytes_to_send as f64 / elapsed_secs) > target_rate {
217 let delay_secs = (bytes_to_send as f64 / target_rate).max(0.0);
218 std::time::Duration::from_secs_f64(delay_secs)
219 } else {
220 std::time::Duration::from_secs(0)
221 }
222 }
223
224 pub fn on_data_sent(&mut self, bytes: usize) {
226 self.bytes_sent += bytes as u64;
227
228 if self.config.dynamic_adjustment {
230 self.adjust_window();
231 }
232 }
233
234 fn adjust_window(&mut self) {
236 let elapsed = self.last_adjustment.elapsed();
237
238 if elapsed < std::time::Duration::from_millis(100) {
240 return;
241 }
242
243 self.last_adjustment = std::time::Instant::now();
244
245 let new_size = (self.current_window_size as f64 * 1.1)
248 .min(self.config.max_window_size as f64) as usize;
249
250 self.current_window_size =
251 new_size.clamp(self.config.min_window_size, self.config.max_window_size);
252 }
253
254 #[allow(dead_code)]
256 pub fn on_congestion(&mut self) {
257 let new_size = self.current_window_size / 2;
259 self.current_window_size = new_size.max(self.config.min_window_size);
260 self.last_adjustment = std::time::Instant::now();
261 }
262
263 pub fn current_throughput(&self) -> f64 {
265 let elapsed = self.start_time.elapsed().as_secs_f64();
266 if elapsed > 0.0 {
267 self.bytes_sent as f64 / elapsed
268 } else {
269 0.0
270 }
271 }
272}
273
274#[derive(Debug, Clone)]
280pub struct OperationState {
281 pub operation_id: String,
283 pub offset: u64,
285 pub total_size: Option<u64>,
287 pub operation_type: OperationType,
289 pub status: OperationStatus,
291}
292
293#[derive(Debug, Clone, Serialize, Deserialize)]
295#[serde(rename_all = "lowercase")]
296pub enum OperationType {
297 Upload,
298 Download,
299}
300
301#[derive(Debug, Clone, Serialize, Deserialize)]
303#[serde(rename_all = "lowercase")]
304pub enum OperationStatus {
305 InProgress,
306 Paused,
307 Cancelled,
308 Completed,
309 Failed,
310}
311
312#[derive(Debug, Clone, Serialize, Deserialize)]
314pub struct ResumeToken {
315 pub operation_id: String,
317 pub offset: u64,
319 pub cid: Option<String>,
321}
322
323impl ResumeToken {
324 pub fn new(operation_id: String, offset: u64, cid: Option<String>) -> Self {
326 Self {
327 operation_id,
328 offset,
329 cid,
330 }
331 }
332
333 pub fn encode(&self) -> Result<String, String> {
335 let json = serde_json::to_string(self).map_err(|e| e.to_string())?;
336 Ok(base64::Engine::encode(
337 &base64::engine::general_purpose::URL_SAFE_NO_PAD,
338 json.as_bytes(),
339 ))
340 }
341
342 pub fn decode(encoded: &str) -> Result<Self, String> {
344 let bytes =
345 base64::Engine::decode(&base64::engine::general_purpose::URL_SAFE_NO_PAD, encoded)
346 .map_err(|e| e.to_string())?;
347
348 let json = String::from_utf8(bytes).map_err(|e| e.to_string())?;
349 serde_json::from_str(&json).map_err(|e| e.to_string())
350 }
351}
352
353#[derive(Debug, Deserialize)]
355pub struct CancelRequest {
356 pub operation_id: String,
358}
359
360#[derive(Debug, Serialize)]
362pub struct CancelResponse {
363 pub operation_id: String,
365 pub cancelled: bool,
367 pub resume_token: Option<String>,
369}
370
371#[derive(Debug, Clone, Serialize)]
377pub struct ProgressEvent {
378 pub operation_id: String,
380 pub bytes_processed: u64,
382 pub total_bytes: Option<u64>,
384 pub progress_percent: Option<f32>,
386 pub status: ProgressStatus,
388}
389
390#[derive(Debug, Clone, Serialize)]
392#[serde(rename_all = "lowercase")]
393pub enum ProgressStatus {
394 Started,
395 InProgress,
396 Completed,
397 Failed,
398}
399
400#[derive(Clone)]
402pub struct ProgressTracker {
403 sender: broadcast::Sender<ProgressEvent>,
404}
405
406impl ProgressTracker {
407 pub fn new() -> Self {
409 let (sender, _) = broadcast::channel(100);
410 Self { sender }
411 }
412
413 pub fn send(&self, event: ProgressEvent) {
415 let _ = self.sender.send(event);
416 }
417
418 pub fn subscribe(&self) -> broadcast::Receiver<ProgressEvent> {
420 self.sender.subscribe()
421 }
422}
423
424impl Default for ProgressTracker {
425 fn default() -> Self {
426 Self::new()
427 }
428}
429
430#[derive(Debug, Deserialize)]
436pub struct StreamDownloadQuery {
437 pub chunk_size: Option<usize>,
439 pub max_bytes_per_second: Option<u64>,
441 pub flow_control: Option<bool>,
443 pub resume_token: Option<String>,
445 pub offset: Option<u64>,
447}
448
449pub async fn stream_download(
455 State(state): State<GatewayState>,
456 Path(cid_str): Path<String>,
457 Query(query): Query<StreamDownloadQuery>,
458) -> Result<Response, StreamingError> {
459 let cid: Cid = cid_str
460 .parse()
461 .map_err(|_| StreamingError::InvalidCid(cid_str.clone()))?;
462
463 let block = state
465 .store
466 .get(&cid)
467 .await
468 .map_err(|e| StreamingError::Storage(e.to_string()))?
469 .ok_or_else(|| StreamingError::NotFound(cid_str.clone()))?;
470
471 let data = block.data().to_vec();
472 let total_size = data.len();
473
474 let start_offset = if let Some(resume_token) = &query.resume_token {
476 let token = ResumeToken::decode(resume_token)
477 .map_err(|e| StreamingError::Upload(format!("Invalid resume token: {}", e)))?;
478
479 if let Some(token_cid) = &token.cid {
481 if token_cid != &cid_str {
482 return Err(StreamingError::Upload(
483 "Resume token CID mismatch".to_string(),
484 ));
485 }
486 }
487
488 token.offset as usize
489 } else {
490 query.offset.unwrap_or(0) as usize
491 };
492
493 if start_offset >= total_size {
495 return Err(StreamingError::Upload(format!(
496 "Invalid offset: {} (total size: {})",
497 start_offset, total_size
498 )));
499 }
500
501 let enable_flow_control = query.flow_control.unwrap_or(false);
503 let flow_controller = if enable_flow_control {
504 let mut config = FlowControlConfig::default();
505 if let Some(rate) = query.max_bytes_per_second {
506 config.max_bytes_per_second = rate;
507 }
508 Some(FlowController::new(config))
509 } else {
510 None
511 };
512
513 let chunk_size = query.chunk_size.unwrap_or_else(|| {
515 flow_controller
516 .as_ref()
517 .map(|fc| fc.window_size())
518 .unwrap_or(64 * 1024)
519 });
520
521 let chunks: Vec<Vec<u8>> = data[start_offset..]
523 .chunks(chunk_size)
524 .map(|chunk| chunk.to_vec())
525 .collect();
526
527 let remaining_size = total_size - start_offset;
528
529 let stream = if let Some(mut fc) = flow_controller {
531 let stream = async_stream::stream! {
532 for chunk in chunks {
533 let chunk_len = chunk.len();
534
535 let delay = fc.calculate_delay(chunk_len);
537 if !delay.is_zero() {
538 tokio::time::sleep(delay).await;
539 }
540
541 fc.on_data_sent(chunk_len);
543
544 yield Ok::<_, Infallible>(Bytes::from(chunk));
545 }
546 };
547 Body::from_stream(stream)
548 } else {
549 let stream = stream::iter(chunks).map(|chunk| Ok::<_, Infallible>(Bytes::from(chunk)));
550 Body::from_stream(stream)
551 };
552
553 let mut response_builder = Response::builder();
555
556 if start_offset > 0 {
558 response_builder = response_builder.status(StatusCode::PARTIAL_CONTENT);
559 let end_offset = total_size - 1;
561 response_builder = response_builder.header(
562 header::CONTENT_RANGE,
563 format!("bytes {}-{}/{}", start_offset, end_offset, total_size),
564 );
565 } else {
566 response_builder = response_builder.status(StatusCode::OK);
567 }
568
569 Ok(response_builder
570 .header(header::CONTENT_TYPE, "application/octet-stream")
571 .header(header::CONTENT_LENGTH, remaining_size.to_string())
572 .header("X-Chunk-Size", chunk_size.to_string())
573 .header("Accept-Ranges", "bytes")
574 .body(stream)
575 .unwrap())
576}
577
578#[derive(Debug, Serialize)]
584pub struct StreamUploadResponse {
585 pub cid: String,
586 pub size: u64,
587 pub chunks_received: usize,
588}
589
590pub async fn stream_upload(
596 State(state): State<GatewayState>,
597 mut multipart: Multipart,
598) -> Result<Json<StreamUploadResponse>, StreamingError> {
599 let mut total_data = Vec::new();
600 let mut chunks_received = 0;
601
602 while let Some(field) = multipart
604 .next_field()
605 .await
606 .map_err(|e| StreamingError::Upload(format!("Failed to read field: {}", e)))?
607 {
608 let data = field
609 .bytes()
610 .await
611 .map_err(|e| StreamingError::Upload(format!("Failed to read data: {}", e)))?;
612
613 total_data.extend_from_slice(&data);
614 chunks_received += 1;
615 }
616
617 if total_data.is_empty() {
618 return Err(StreamingError::Upload("No data received".to_string()));
619 }
620
621 let block = Block::new(Bytes::from(total_data))
623 .map_err(|e| StreamingError::Upload(format!("Failed to create block: {}", e)))?;
624
625 let cid = *block.cid();
626 let size = block.size();
627
628 state
629 .store
630 .put(&block)
631 .await
632 .map_err(|e| StreamingError::Storage(e.to_string()))?;
633
634 info!("Stream upload completed: {} ({} bytes)", cid, size);
635
636 Ok(Json(StreamUploadResponse {
637 cid: cid.to_string(),
638 size,
639 chunks_received,
640 }))
641}
642
643#[derive(Debug, Deserialize)]
649pub struct BatchGetRequest {
650 pub cids: Vec<String>,
652}
653
654#[derive(Debug, Serialize)]
656pub struct BatchGetResponse {
657 pub blocks: Vec<BatchBlockResult>,
659 pub errors: Vec<BatchError>,
661}
662
663#[derive(Debug, Serialize)]
665pub struct BatchBlockResult {
666 pub cid: String,
667 pub data: String, pub size: u64,
669}
670
671#[derive(Debug, Serialize)]
673pub struct BatchError {
674 pub cid: String,
675 pub error: String,
676}
677
678pub async fn batch_get(
685 State(state): State<GatewayState>,
686 Json(req): Json<BatchGetRequest>,
687) -> Result<Json<BatchGetResponse>, StreamingError> {
688 validation::validate_batch_size(req.cids.len()).map_err(StreamingError::Validation)?;
690
691 let tasks: Vec<_> = req
693 .cids
694 .into_iter()
695 .map(|cid_str| {
696 let state = state.clone();
697 tokio::spawn(async move {
698 match cid_str.parse::<Cid>() {
699 Ok(cid) => match state.store.get(&cid).await {
700 Ok(Some(block)) => {
701 let data_base64 = base64::Engine::encode(
702 &base64::engine::general_purpose::STANDARD,
703 block.data(),
704 );
705 Ok(BatchBlockResult {
706 cid: cid_str,
707 data: data_base64,
708 size: block.size(),
709 })
710 }
711 Ok(None) => Err(BatchError {
712 cid: cid_str,
713 error: "Block not found".to_string(),
714 }),
715 Err(e) => Err(BatchError {
716 cid: cid_str,
717 error: e.to_string(),
718 }),
719 },
720 Err(_) => Err(BatchError {
721 cid: cid_str,
722 error: "Invalid CID".to_string(),
723 }),
724 }
725 })
726 })
727 .collect();
728
729 let mut blocks = Vec::new();
731 let mut errors = Vec::new();
732
733 for task in tasks {
734 match task.await {
735 Ok(Ok(block)) => blocks.push(block),
736 Ok(Err(error)) => errors.push(error),
737 Err(e) => {
738 errors.push(BatchError {
740 cid: "unknown".to_string(),
741 error: format!("Task execution error: {}", e),
742 });
743 }
744 }
745 }
746
747 Ok(Json(BatchGetResponse { blocks, errors }))
748}
749
750#[derive(Debug, Deserialize)]
752pub struct BatchPutItem {
753 pub data: String,
755}
756
757#[derive(Debug, Clone, Copy, Deserialize, Serialize, PartialEq, Eq, Default)]
759#[serde(rename_all = "lowercase")]
760pub enum TransactionMode {
761 Atomic,
763 #[default]
765 BestEffort,
766}
767
768#[derive(Debug, Deserialize)]
770pub struct BatchPutRequest {
771 pub blocks: Vec<BatchPutItem>,
773 #[serde(default)]
775 pub transaction_mode: TransactionMode,
776}
777
778#[derive(Debug, Serialize)]
780pub struct BatchPutResponse {
781 pub stored: Vec<BatchStoredResult>,
783 pub errors: Vec<BatchPutError>,
785 pub transaction_id: Option<String>,
787 pub transaction_status: TransactionStatus,
789}
790
791#[derive(Debug, Clone, Serialize)]
793#[serde(rename_all = "lowercase")]
794pub enum TransactionStatus {
795 Committed,
797 PartialSuccess,
799 RolledBack,
801}
802
803#[derive(Debug, Serialize)]
805pub struct BatchStoredResult {
806 pub cid: String,
807 pub size: u64,
808 pub index: usize,
809}
810
811#[derive(Debug, Serialize)]
813pub struct BatchPutError {
814 pub index: usize,
815 pub error: String,
816}
817
818pub async fn batch_put(
825 State(state): State<GatewayState>,
826 Json(req): Json<BatchPutRequest>,
827) -> Result<Json<BatchPutResponse>, StreamingError> {
828 let transaction_id = Uuid::new_v4().to_string();
829
830 match req.transaction_mode {
831 TransactionMode::Atomic => batch_put_atomic(state, req.blocks, transaction_id).await,
832 TransactionMode::BestEffort => {
833 batch_put_best_effort(state, req.blocks, transaction_id).await
834 }
835 }
836}
837
838async fn batch_put_atomic(
840 state: GatewayState,
841 items: Vec<BatchPutItem>,
842 transaction_id: String,
843) -> Result<Json<BatchPutResponse>, StreamingError> {
844 validation::validate_batch_size(items.len()).map_err(StreamingError::Validation)?;
846
847 let mut prepared_blocks = Vec::new();
849 let mut errors = Vec::new();
850
851 for (index, item) in items.into_iter().enumerate() {
852 match base64::Engine::decode(&base64::engine::general_purpose::STANDARD, &item.data) {
853 Ok(data) => match Block::new(Bytes::from(data)) {
854 Ok(block) => {
855 prepared_blocks.push((index, block));
856 }
857 Err(e) => {
858 errors.push(BatchPutError {
859 index,
860 error: format!("Block creation error: {}", e),
861 });
862 }
863 },
864 Err(e) => {
865 errors.push(BatchPutError {
866 index,
867 error: format!("Base64 decode error: {}", e),
868 });
869 }
870 }
871 }
872
873 if !errors.is_empty() {
875 info!(
876 "Atomic batch put [{}] rolled back: {} validation errors",
877 transaction_id,
878 errors.len()
879 );
880 return Ok(Json(BatchPutResponse {
881 stored: vec![],
882 errors,
883 transaction_id: Some(transaction_id),
884 transaction_status: TransactionStatus::RolledBack,
885 }));
886 }
887
888 let mut stored = Vec::new();
890 let mut stored_cids = Vec::new(); for (index, block) in prepared_blocks {
893 let cid = *block.cid();
894 let size = block.size();
895
896 match state.store.put(&block).await {
897 Ok(_) => {
898 stored_cids.push(cid);
899 stored.push(BatchStoredResult {
900 cid: cid.to_string(),
901 size,
902 index,
903 });
904 }
905 Err(e) => {
906 info!(
908 "Atomic batch put [{}] rolling back: storage error at index {}",
909 transaction_id, index
910 );
911
912 for stored_cid in stored_cids {
914 let _ = state.store.delete(&stored_cid).await; }
916
917 return Ok(Json(BatchPutResponse {
918 stored: vec![],
919 errors: vec![BatchPutError {
920 index,
921 error: format!("Storage error (transaction rolled back): {}", e),
922 }],
923 transaction_id: Some(transaction_id),
924 transaction_status: TransactionStatus::RolledBack,
925 }));
926 }
927 }
928 }
929
930 info!(
931 "Atomic batch put [{}] committed: {} blocks stored",
932 transaction_id,
933 stored.len()
934 );
935
936 Ok(Json(BatchPutResponse {
937 stored,
938 errors: vec![],
939 transaction_id: Some(transaction_id),
940 transaction_status: TransactionStatus::Committed,
941 }))
942}
943
944async fn batch_put_best_effort(
946 state: GatewayState,
947 items: Vec<BatchPutItem>,
948 transaction_id: String,
949) -> Result<Json<BatchPutResponse>, StreamingError> {
950 validation::validate_batch_size(items.len()).map_err(StreamingError::Validation)?;
952
953 let mut stored = Vec::new();
954 let mut errors = Vec::new();
955
956 for (index, item) in items.into_iter().enumerate() {
957 match base64::Engine::decode(&base64::engine::general_purpose::STANDARD, &item.data) {
958 Ok(data) => match Block::new(Bytes::from(data)) {
959 Ok(block) => {
960 let cid = *block.cid();
961 let size = block.size();
962
963 match state.store.put(&block).await {
964 Ok(_) => {
965 stored.push(BatchStoredResult {
966 cid: cid.to_string(),
967 size,
968 index,
969 });
970 }
971 Err(e) => {
972 errors.push(BatchPutError {
973 index,
974 error: format!("Storage error: {}", e),
975 });
976 }
977 }
978 }
979 Err(e) => {
980 errors.push(BatchPutError {
981 index,
982 error: format!("Block creation error: {}", e),
983 });
984 }
985 },
986 Err(e) => {
987 errors.push(BatchPutError {
988 index,
989 error: format!("Base64 decode error: {}", e),
990 });
991 }
992 }
993 }
994
995 let status = if errors.is_empty() {
996 TransactionStatus::Committed
997 } else {
998 TransactionStatus::PartialSuccess
999 };
1000
1001 info!(
1002 "Best-effort batch put [{}] completed: {} stored, {} errors",
1003 transaction_id,
1004 stored.len(),
1005 errors.len()
1006 );
1007
1008 Ok(Json(BatchPutResponse {
1009 stored,
1010 errors,
1011 transaction_id: Some(transaction_id),
1012 transaction_status: status,
1013 }))
1014}
1015
1016#[derive(Debug, Deserialize)]
1018pub struct BatchHasRequest {
1019 pub cids: Vec<String>,
1021}
1022
1023#[derive(Debug, Serialize)]
1025pub struct BatchHasResponse {
1026 pub results: Vec<BatchHasResult>,
1028}
1029
1030#[derive(Debug, Serialize)]
1032pub struct BatchHasResult {
1033 pub cid: String,
1034 pub exists: bool,
1035}
1036
1037pub async fn batch_has(
1044 State(state): State<GatewayState>,
1045 Json(req): Json<BatchHasRequest>,
1046) -> Result<Json<BatchHasResponse>, StreamingError> {
1047 validation::validate_batch_size(req.cids.len()).map_err(StreamingError::Validation)?;
1049
1050 let tasks: Vec<_> = req
1052 .cids
1053 .into_iter()
1054 .map(|cid_str| {
1055 let state = state.clone();
1056 tokio::spawn(async move {
1057 let exists = if let Ok(cid) = cid_str.parse::<Cid>() {
1058 state.store.has(&cid).await.unwrap_or(false)
1059 } else {
1060 false
1061 };
1062
1063 BatchHasResult {
1064 cid: cid_str,
1065 exists,
1066 }
1067 })
1068 })
1069 .collect();
1070
1071 let mut results = Vec::new();
1073
1074 for task in tasks {
1075 match task.await {
1076 Ok(result) => results.push(result),
1077 Err(e) => {
1078 results.push(BatchHasResult {
1080 cid: format!("task_error_{}", e),
1081 exists: false,
1082 });
1083 }
1084 }
1085 }
1086
1087 Ok(Json(BatchHasResponse { results }))
1088}
1089
1090pub async fn progress_stream(
1100 State(_state): State<GatewayState>,
1101 Path(operation_id): Path<String>,
1102) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
1103 let tracker = ProgressTracker::new();
1105 let mut receiver = tracker.subscribe();
1106
1107 let stream = async_stream::stream! {
1109 let initial = ProgressEvent {
1111 operation_id: operation_id.clone(),
1112 bytes_processed: 0,
1113 total_bytes: None,
1114 progress_percent: Some(0.0),
1115 status: ProgressStatus::Started,
1116 };
1117 yield Ok(Event::default()
1118 .event("progress")
1119 .json_data(initial)
1120 .unwrap());
1121
1122 loop {
1124 match tokio::time::timeout(Duration::from_secs(30), receiver.recv()).await {
1125 Ok(Ok(event)) => {
1126 let is_complete = matches!(event.status, ProgressStatus::Completed | ProgressStatus::Failed);
1127 yield Ok(Event::default()
1128 .event("progress")
1129 .json_data(event)
1130 .unwrap());
1131
1132 if is_complete {
1133 break;
1134 }
1135 }
1136 Ok(Err(_)) => {
1137 break;
1139 }
1140 Err(_) => {
1141 yield Ok(Event::default().comment("keepalive"));
1143 }
1144 }
1145 }
1146 };
1147
1148 Sse::new(stream).keep_alive(KeepAlive::default())
1149}
1150
1151pub mod validation {
1157
1158 pub fn validate_cid(cid: &str) -> Result<(), String> {
1160 if cid.is_empty() {
1161 return Err("CID cannot be empty".to_string());
1162 }
1163
1164 if !cid.starts_with("Qm") && !cid.starts_with("bafy") && !cid.starts_with("baf") {
1166 return Err(format!("Invalid CID format: {}", cid));
1167 }
1168
1169 if cid.len() < 10 {
1170 return Err(format!("CID too short: {}", cid));
1171 }
1172
1173 Ok(())
1174 }
1175
1176 pub fn validate_offset(offset: u64, total_size: usize) -> Result<(), String> {
1178 if offset as usize >= total_size {
1179 return Err(format!(
1180 "Offset {} exceeds total size {}",
1181 offset, total_size
1182 ));
1183 }
1184 Ok(())
1185 }
1186
1187 pub fn validate_chunk_size(chunk_size: usize) -> Result<(), String> {
1189 const MIN_CHUNK_SIZE: usize = 1024; const MAX_CHUNK_SIZE: usize = 10 * 1024 * 1024; if chunk_size < MIN_CHUNK_SIZE {
1193 return Err(format!(
1194 "Chunk size {} is too small (minimum: {})",
1195 chunk_size, MIN_CHUNK_SIZE
1196 ));
1197 }
1198
1199 if chunk_size > MAX_CHUNK_SIZE {
1200 return Err(format!(
1201 "Chunk size {} is too large (maximum: {})",
1202 chunk_size, MAX_CHUNK_SIZE
1203 ));
1204 }
1205
1206 Ok(())
1207 }
1208
1209 pub fn validate_rate_limit(bytes_per_second: u64) -> Result<(), String> {
1211 const MAX_RATE: u64 = 10 * 1024 * 1024 * 1024; if bytes_per_second > MAX_RATE {
1214 return Err(format!(
1215 "Rate limit {} exceeds maximum {}",
1216 bytes_per_second, MAX_RATE
1217 ));
1218 }
1219
1220 Ok(())
1221 }
1222
1223 pub fn validate_batch_size(count: usize) -> Result<(), String> {
1225 const MAX_BATCH_SIZE: usize = 1000;
1226
1227 if count == 0 {
1228 return Err("Batch cannot be empty".to_string());
1229 }
1230
1231 if count > MAX_BATCH_SIZE {
1232 return Err(format!(
1233 "Batch size {} exceeds maximum {}",
1234 count, MAX_BATCH_SIZE
1235 ));
1236 }
1237
1238 Ok(())
1239 }
1240}
1241
1242#[derive(Debug)]
1248pub enum StreamingError {
1249 InvalidCid(String),
1250 NotFound(String),
1251 Upload(String),
1252 Storage(String),
1253 Validation(String),
1254}
1255
1256impl IntoResponse for StreamingError {
1257 fn into_response(self) -> Response {
1258 let (status, message) = match self {
1259 StreamingError::InvalidCid(cid) => {
1260 (StatusCode::BAD_REQUEST, format!("Invalid CID: {}", cid))
1261 }
1262 StreamingError::NotFound(cid) => {
1263 (StatusCode::NOT_FOUND, format!("Block not found: {}", cid))
1264 }
1265 StreamingError::Upload(msg) => {
1266 (StatusCode::BAD_REQUEST, format!("Upload error: {}", msg))
1267 }
1268 StreamingError::Storage(msg) => (
1269 StatusCode::INTERNAL_SERVER_ERROR,
1270 format!("Storage error: {}", msg),
1271 ),
1272 StreamingError::Validation(msg) => (
1273 StatusCode::BAD_REQUEST,
1274 format!("Validation error: {}", msg),
1275 ),
1276 };
1277
1278 (status, message).into_response()
1279 }
1280}
1281
1282#[cfg(test)]
1283mod tests {
1284 use super::*;
1285
1286 #[test]
1287 fn test_progress_event_serialization() {
1288 let event = ProgressEvent {
1289 operation_id: "test-123".to_string(),
1290 bytes_processed: 1024,
1291 total_bytes: Some(2048),
1292 progress_percent: Some(50.0),
1293 status: ProgressStatus::InProgress,
1294 };
1295
1296 let json = serde_json::to_string(&event).unwrap();
1297 assert!(json.contains("test-123"));
1298 assert!(json.contains("1024"));
1299 assert!(json.contains("inprogress"));
1300 }
1301
1302 #[test]
1303 fn test_progress_tracker() {
1304 let tracker = ProgressTracker::new();
1305 let _receiver = tracker.subscribe();
1306
1307 let event = ProgressEvent {
1308 operation_id: "test".to_string(),
1309 bytes_processed: 100,
1310 total_bytes: Some(200),
1311 progress_percent: Some(50.0),
1312 status: ProgressStatus::InProgress,
1313 };
1314
1315 tracker.send(event);
1316
1317 }
1320
1321 #[test]
1322 fn test_batch_request_deserialization() {
1323 let json = r#"{"cids": ["QmTest1", "QmTest2"]}"#;
1324 let req: BatchGetRequest = serde_json::from_str(json).unwrap();
1325 assert_eq!(req.cids.len(), 2);
1326 assert_eq!(req.cids[0], "QmTest1");
1327 }
1328
1329 #[test]
1330 fn test_batch_put_request_deserialization() {
1331 let json = r#"{"blocks": [{"data": "SGVsbG8="}]}"#;
1332 let req: BatchPutRequest = serde_json::from_str(json).unwrap();
1333 assert_eq!(req.blocks.len(), 1);
1334 assert_eq!(req.blocks[0].data, "SGVsbG8=");
1335 assert_eq!(req.transaction_mode, TransactionMode::BestEffort); }
1337
1338 #[test]
1339 fn test_batch_put_request_atomic_mode() {
1340 let json = r#"{"blocks": [{"data": "SGVsbG8="}], "transaction_mode": "atomic"}"#;
1341 let req: BatchPutRequest = serde_json::from_str(json).unwrap();
1342 assert_eq!(req.transaction_mode, TransactionMode::Atomic);
1343 }
1344
1345 #[test]
1346 fn test_transaction_mode_default() {
1347 let mode = TransactionMode::default();
1348 assert_eq!(mode, TransactionMode::BestEffort);
1349 }
1350
1351 #[test]
1352 fn test_transaction_status_serialization() {
1353 let status = TransactionStatus::Committed;
1354 let json = serde_json::to_string(&status).unwrap();
1355 assert_eq!(json, r#""committed""#);
1356
1357 let status = TransactionStatus::RolledBack;
1358 let json = serde_json::to_string(&status).unwrap();
1359 assert_eq!(json, r#""rolledback""#);
1360 }
1361
1362 #[test]
1363 fn test_batch_put_response_with_transaction() {
1364 let response = BatchPutResponse {
1365 stored: vec![],
1366 errors: vec![],
1367 transaction_id: Some("test-txn-123".to_string()),
1368 transaction_status: TransactionStatus::Committed,
1369 };
1370
1371 let json = serde_json::to_string(&response).unwrap();
1372 assert!(json.contains("test-txn-123"));
1373 assert!(json.contains("committed"));
1374 }
1375
1376 #[test]
1377 fn test_flow_control_config_default() {
1378 let config = FlowControlConfig::default();
1379 assert_eq!(config.max_bytes_per_second, 0);
1380 assert_eq!(config.initial_window_size, 256 * 1024);
1381 assert_eq!(config.max_window_size, 1024 * 1024);
1382 assert_eq!(config.min_window_size, 64 * 1024);
1383 assert!(config.dynamic_adjustment);
1384 }
1385
1386 #[test]
1387 fn test_flow_control_config_with_rate_limit() {
1388 let config = FlowControlConfig::with_rate_limit(1_000_000); assert_eq!(config.max_bytes_per_second, 1_000_000);
1390 assert!(config.dynamic_adjustment);
1391 }
1392
1393 #[test]
1394 fn test_flow_control_config_conservative() {
1395 let config = FlowControlConfig::conservative();
1396 assert_eq!(config.initial_window_size, 64 * 1024);
1397 assert_eq!(config.max_window_size, 256 * 1024);
1398 assert_eq!(config.min_window_size, 32 * 1024);
1399 }
1400
1401 #[test]
1402 fn test_flow_control_config_aggressive() {
1403 let config = FlowControlConfig::aggressive();
1404 assert_eq!(config.initial_window_size, 512 * 1024);
1405 assert_eq!(config.max_window_size, 2 * 1024 * 1024);
1406 assert_eq!(config.min_window_size, 128 * 1024);
1407 }
1408
1409 #[test]
1410 fn test_flow_controller_window_size() {
1411 let config = FlowControlConfig::default();
1412 let controller = FlowController::new(config.clone());
1413 assert_eq!(controller.window_size(), config.initial_window_size);
1414 }
1415
1416 #[test]
1417 fn test_flow_controller_no_rate_limit() {
1418 let config = FlowControlConfig::default(); let controller = FlowController::new(config);
1420
1421 let delay = controller.calculate_delay(1024);
1423 assert_eq!(delay, std::time::Duration::from_secs(0));
1424 }
1425
1426 #[test]
1427 fn test_flow_controller_on_data_sent() {
1428 let config = FlowControlConfig::default();
1429 let mut controller = FlowController::new(config);
1430
1431 controller.on_data_sent(1024);
1432 assert_eq!(controller.bytes_sent, 1024);
1433
1434 controller.on_data_sent(2048);
1435 assert_eq!(controller.bytes_sent, 3072);
1436 }
1437
1438 #[test]
1439 fn test_flow_controller_on_congestion() {
1440 let config = FlowControlConfig::default();
1441 let mut controller = FlowController::new(config.clone());
1442
1443 let initial_window = controller.window_size();
1444 controller.on_congestion();
1445
1446 assert!(controller.window_size() < initial_window);
1448 assert!(controller.window_size() >= config.min_window_size);
1449 }
1450
1451 #[test]
1452 fn test_flow_controller_throughput() {
1453 let config = FlowControlConfig::default();
1454 let mut controller = FlowController::new(config);
1455
1456 controller.on_data_sent(1024);
1458
1459 let throughput = controller.current_throughput();
1461 assert!(throughput >= 0.0);
1462 }
1463
1464 #[test]
1465 fn test_resume_token_encode_decode() {
1466 let token = ResumeToken::new("op-123".to_string(), 4096, Some("QmTest123".to_string()));
1467
1468 let encoded = token.encode().unwrap();
1470 assert!(!encoded.is_empty());
1471
1472 let decoded = ResumeToken::decode(&encoded).unwrap();
1474 assert_eq!(decoded.operation_id, "op-123");
1475 assert_eq!(decoded.offset, 4096);
1476 assert_eq!(decoded.cid, Some("QmTest123".to_string()));
1477 }
1478
1479 #[test]
1480 fn test_resume_token_invalid() {
1481 let result = ResumeToken::decode("invalid!!!base64");
1483 assert!(result.is_err());
1484
1485 let invalid_json = base64::Engine::encode(
1487 &base64::engine::general_purpose::URL_SAFE_NO_PAD,
1488 b"not json",
1489 );
1490 let result = ResumeToken::decode(&invalid_json);
1491 assert!(result.is_err());
1492 }
1493
1494 #[test]
1495 fn test_operation_type_serialization() {
1496 let upload = OperationType::Upload;
1497 let json = serde_json::to_string(&upload).unwrap();
1498 assert_eq!(json, r#""upload""#);
1499
1500 let download = OperationType::Download;
1501 let json = serde_json::to_string(&download).unwrap();
1502 assert_eq!(json, r#""download""#);
1503 }
1504
1505 #[test]
1506 fn test_operation_status_serialization() {
1507 let status = OperationStatus::InProgress;
1508 let json = serde_json::to_string(&status).unwrap();
1509 assert_eq!(json, r#""inprogress""#);
1510
1511 let status = OperationStatus::Cancelled;
1512 let json = serde_json::to_string(&status).unwrap();
1513 assert_eq!(json, r#""cancelled""#);
1514 }
1515
1516 #[test]
1517 fn test_cancel_response_serialization() {
1518 let response = CancelResponse {
1519 operation_id: "op-456".to_string(),
1520 cancelled: true,
1521 resume_token: Some("token123".to_string()),
1522 };
1523
1524 let json = serde_json::to_string(&response).unwrap();
1525 assert!(json.contains("op-456"));
1526 assert!(json.contains("true"));
1527 assert!(json.contains("token123"));
1528 }
1529
1530 #[test]
1532 fn test_validate_cid_valid() {
1533 assert!(validation::validate_cid("QmTest123456").is_ok());
1534 assert!(validation::validate_cid(
1535 "bafybeigdyrzt5sfp7udm7hu76uh7y26nf3efuylqabf3oclgtqy55fbzdi"
1536 )
1537 .is_ok());
1538 assert!(validation::validate_cid(
1539 "bafkreigh2akiscaildcqabsyg3dfr6chu3fgpregiymsck7e7aqa4s52zy"
1540 )
1541 .is_ok());
1542 }
1543
1544 #[test]
1545 fn test_validate_cid_invalid() {
1546 assert!(validation::validate_cid("").is_err());
1547 assert!(validation::validate_cid("invalid").is_err());
1548 assert!(validation::validate_cid("Qm").is_err());
1549 }
1550
1551 #[test]
1552 fn test_validate_offset_valid() {
1553 assert!(validation::validate_offset(0, 1000).is_ok());
1554 assert!(validation::validate_offset(500, 1000).is_ok());
1555 assert!(validation::validate_offset(999, 1000).is_ok());
1556 }
1557
1558 #[test]
1559 fn test_validate_offset_invalid() {
1560 assert!(validation::validate_offset(1000, 1000).is_err());
1561 assert!(validation::validate_offset(2000, 1000).is_err());
1562 }
1563
1564 #[test]
1565 fn test_validate_chunk_size_valid() {
1566 assert!(validation::validate_chunk_size(1024).is_ok()); assert!(validation::validate_chunk_size(64 * 1024).is_ok()); assert!(validation::validate_chunk_size(10 * 1024 * 1024).is_ok()); }
1570
1571 #[test]
1572 fn test_validate_chunk_size_invalid() {
1573 assert!(validation::validate_chunk_size(512).is_err()); assert!(validation::validate_chunk_size(20 * 1024 * 1024).is_err()); }
1576
1577 #[test]
1578 fn test_validate_rate_limit_valid() {
1579 assert!(validation::validate_rate_limit(0).is_ok()); assert!(validation::validate_rate_limit(1_000_000).is_ok()); assert!(validation::validate_rate_limit(10 * 1024 * 1024 * 1024).is_ok());
1582 }
1584
1585 #[test]
1586 fn test_validate_rate_limit_invalid() {
1587 assert!(validation::validate_rate_limit(20 * 1024 * 1024 * 1024).is_err());
1588 }
1590
1591 #[test]
1592 fn test_validate_batch_size_valid() {
1593 assert!(validation::validate_batch_size(1).is_ok());
1594 assert!(validation::validate_batch_size(100).is_ok());
1595 assert!(validation::validate_batch_size(1000).is_ok()); }
1597
1598 #[test]
1599 fn test_validate_batch_size_invalid() {
1600 assert!(validation::validate_batch_size(0).is_err()); assert!(validation::validate_batch_size(1001).is_err()); assert!(validation::validate_batch_size(5000).is_err()); }
1604
1605 #[test]
1606 fn test_flow_control_config_validation_valid() {
1607 let config = FlowControlConfig::default();
1608 assert!(config.validate().is_ok());
1609
1610 let config = FlowControlConfig::conservative();
1611 assert!(config.validate().is_ok());
1612
1613 let config = FlowControlConfig::aggressive();
1614 assert!(config.validate().is_ok());
1615 }
1616
1617 #[test]
1618 fn test_flow_control_config_validation_invalid() {
1619 let config = FlowControlConfig {
1621 max_bytes_per_second: 0,
1622 initial_window_size: 64 * 1024,
1623 max_window_size: 1024 * 1024,
1624 min_window_size: 128 * 1024, dynamic_adjustment: true,
1626 };
1627 assert!(config.validate().is_err());
1628
1629 let config = FlowControlConfig {
1631 max_bytes_per_second: 0,
1632 initial_window_size: 2 * 1024 * 1024,
1633 max_window_size: 1024 * 1024, min_window_size: 64 * 1024,
1635 dynamic_adjustment: true,
1636 };
1637 assert!(config.validate().is_err());
1638
1639 let config = FlowControlConfig {
1641 max_bytes_per_second: 20 * 1024 * 1024 * 1024, initial_window_size: 256 * 1024,
1643 max_window_size: 1024 * 1024,
1644 min_window_size: 64 * 1024,
1645 dynamic_adjustment: true,
1646 };
1647 assert!(config.validate().is_err());
1648 }
1649
1650 #[test]
1651 fn test_concurrency_config_default() {
1652 let config = ConcurrencyConfig::default();
1653 assert_eq!(config.max_concurrent_tasks, 100);
1654 assert!(config.parallel_enabled);
1655 assert!(config.validate().is_ok());
1656 }
1657
1658 #[test]
1659 fn test_concurrency_config_conservative() {
1660 let config = ConcurrencyConfig::conservative();
1661 assert_eq!(config.max_concurrent_tasks, 50);
1662 assert!(config.parallel_enabled);
1663 assert!(config.validate().is_ok());
1664 }
1665
1666 #[test]
1667 fn test_concurrency_config_aggressive() {
1668 let config = ConcurrencyConfig::aggressive();
1669 assert_eq!(config.max_concurrent_tasks, 200);
1670 assert!(config.parallel_enabled);
1671 assert!(config.validate().is_ok());
1672 }
1673
1674 #[test]
1675 fn test_concurrency_config_sequential() {
1676 let config = ConcurrencyConfig::sequential();
1677 assert_eq!(config.max_concurrent_tasks, 1);
1678 assert!(!config.parallel_enabled);
1679 assert!(config.validate().is_ok());
1680 }
1681
1682 #[test]
1683 fn test_concurrency_config_validation_invalid() {
1684 let config = ConcurrencyConfig {
1685 max_concurrent_tasks: 0,
1686 parallel_enabled: true,
1687 };
1688 assert!(config.validate().is_err());
1689 }
1690
1691 #[test]
1692 fn test_concurrency_config_validation_valid() {
1693 let config = ConcurrencyConfig {
1694 max_concurrent_tasks: 0,
1695 parallel_enabled: false,
1696 };
1697 assert!(config.validate().is_ok());
1698
1699 let config = ConcurrencyConfig {
1700 max_concurrent_tasks: 100,
1701 parallel_enabled: true,
1702 };
1703 assert!(config.validate().is_ok());
1704 }
1705}