reqx 0.1.35

Rust HTTP transport client for API SDK libraries with retry, timeout, idempotency, proxy, and pluggable TLS backends
Documentation
use std::collections::{BTreeMap, BTreeSet};
use std::error::Error;
use std::io::Cursor;
use std::sync::{Arc, Mutex};

#[cfg(feature = "_async")]
use reqx::advanced::{
    AsyncResumableUploadBackend, AsyncResumableUploader, ResumableUploadOptions, UploadedPart,
};
#[cfg(feature = "_async")]
use thiserror::Error;

#[cfg(feature = "_async")]
#[derive(Debug, Error)]
#[error("{message}")]
struct DemoError {
    message: String,
}

#[cfg(feature = "_async")]
#[derive(Default)]
struct DemoBackend {
    parts: Arc<Mutex<BTreeMap<u32, Vec<u8>>>>,
    fail_once_parts: Arc<Mutex<BTreeSet<u32>>>,
}

#[cfg(feature = "_async")]
impl DemoBackend {
    fn fail_once_for_part(&self, part_number: u32) {
        let mut fail_once = self.fail_once_parts.lock().expect("lock fail_once_parts");
        fail_once.insert(part_number);
    }
}

#[cfg(feature = "_async")]
impl AsyncResumableUploadBackend for DemoBackend {
    type Error = DemoError;

    async fn create_upload(&self) -> Result<String, Self::Error> {
        Ok("demo-upload-1".to_owned())
    }

    async fn upload_part(
        &self,
        _upload_id: &str,
        part_number: u32,
        chunk: &[u8],
    ) -> Result<UploadedPart, Self::Error> {
        let mut fail_once = self.fail_once_parts.lock().expect("lock fail_once_parts");
        if fail_once.remove(&part_number) {
            return Err(DemoError {
                message: format!("simulated transient failure on part {part_number}"),
            });
        }

        let mut parts = self.parts.lock().expect("lock parts");
        parts.insert(part_number, chunk.to_vec());
        Ok(UploadedPart {
            part_number,
            etag: format!("etag-{part_number}"),
            size: chunk.len(),
            checksum: None,
        })
    }

    async fn complete_upload(
        &self,
        upload_id: &str,
        parts: &[UploadedPart],
    ) -> Result<(), Self::Error> {
        println!("complete upload_id={upload_id} parts={}", parts.len());
        Ok(())
    }
}

#[cfg(feature = "_async")]
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    let backend = DemoBackend::default();
    backend.fail_once_for_part(2);

    let first_attempt = AsyncResumableUploader::new(
        ResumableUploadOptions::new()
            .with_part_size(4)
            .with_max_attempts(1)
            .with_jitter_ratio(0.0),
    );
    let mut reader = Cursor::new(b"abcdefgh".to_vec());
    let failure = first_attempt
        .upload(&backend, &mut reader)
        .await
        .expect_err("first attempt should fail to demo checkpoint-based resume");
    let checkpoint = failure
        .into_checkpoint()
        .expect("part failure should include checkpoint");
    println!(
        "checkpoint persisted: upload_id={} completed_parts={}",
        checkpoint.upload_id,
        checkpoint.completed_parts.len()
    );

    let resumed = AsyncResumableUploader::new(
        ResumableUploadOptions::new()
            .with_part_size(4)
            .with_max_attempts(2)
            .with_jitter_ratio(0.0),
    );
    let mut replay_reader = Cursor::new(b"abcdefgh".to_vec());
    let result = resumed
        .resume(&backend, &mut replay_reader, checkpoint)
        .await?;
    println!(
        "resumed={} total_parts={} total_bytes={}",
        result.resumed, result.total_parts, result.total_bytes
    );
    Ok(())
}

#[cfg(not(feature = "_async"))]
fn main() {
    eprintln!("enable an `async-tls-*` feature to run this example");
}