futures-buffered 0.2.13

future concurrency primitives with emphasis on performance and low memory usage
Documentation
#![cfg(not(miri))]
use std::time::Instant;

use futures::StreamExt;
use futures_buffered::BufferedStreamExt;
use reqwest::{Client, Error};

static URLS: &[&str] = &[
    "https://api.ipify.org/",
    "https://www.boredapi.com/api/activity",
    "https://random.dog/woof.json",
];

#[tokio::test]
async fn futures_util() -> Result<(), Error> {
    let http = Client::new();

    let start = Instant::now();

    futures::stream::iter(URLS)
        .cycle()
        .take(256)
        .map(|&url| {
            let client = &http;
            async move {
                let resp = client.get(url).send().await?;
                let status = resp.status();
                let text = resp.text().await;
                Ok::<_, Error>((url, status, text))
            }
        })
        .buffer_unordered(8)
        .for_each(|res| async {
            if let Ok((url, status, Ok(text))) = res {
                println!("{url} ({status}) {text}");
            }
        })
        .await;

    println!("end {:?}", start.elapsed());

    Ok(())
}

#[tokio::test]
async fn futures_buffered() -> Result<(), Error> {
    let http = Client::new();

    let start = Instant::now();

    futures::stream::iter(URLS)
        .cycle()
        .take(256)
        .map(|&url| {
            let client = &http;
            async move {
                let resp = client.get(url).send().await?;
                let status = resp.status();
                let text = resp.text().await;
                Ok::<_, Error>((url, status, text))
            }
        })
        .buffered_unordered(8)
        .for_each(|res| async {
            if let Ok((url, status, Ok(text))) = res {
                println!("{url} ({status}) {text}");
            }
        })
        .await;

    println!("end {:?}", start.elapsed());

    Ok(())
}