unfurl 0.2.1

A tool for expanding links in text
use std::thread;
use std::sync::mpsc;

use bytes::Bytes;
use futures::{stream, StreamExt};
use once_cell::sync::OnceCell;
use reqwest;

use crate::error;

const CONCURRENT_REQUESTS: usize = 5;

static SERVICE: OnceCell<Service> = OnceCell::new();

#[derive(Debug)]
pub struct Request {
  key: String,
  req: reqwest::RequestBuilder,
}

impl Request{
  pub fn new(key: &str, req: reqwest::RequestBuilder) -> Self {
    Request{
      key: key.to_string(),
      req: req,
    }
  }
}

#[derive(Debug)]
struct Requests {
  tx: mpsc::Sender<Vec<Response>>,
  reqs: Vec<Request>,
}

#[derive(Debug)]
pub struct Response {
  key: String,
  data: Result<Bytes, error::Error>,
}

impl Response {
  pub fn key<'a>(&'a self) -> &'a str {
    &self.key
  }

  pub fn data<'a>(&'a self) -> &'a Result<Bytes, error::Error> {
    &self.data
  }
}

pub struct Service {
  tx: mpsc::Sender<Requests>,
}

impl Service {
  pub fn instance() -> &'static Service {
    SERVICE.get_or_init(|| { Self::new() })
  }

  fn new() -> Service {
    let (q_tx, q_rx) = mpsc::channel();
    let svc = Service{tx: q_tx};
    thread::spawn(|| { Service::run(q_rx) });
    svc
  }

  pub fn fetch_requests(&self, reqs: Vec<Request>) -> Result<mpsc::Receiver<Vec<Response>>, error::Error> {
    let (p_tx, p_rx) = mpsc::channel();
    match self.tx.send(Requests{tx: p_tx, reqs: reqs}) {
      Ok(_)  => Ok(p_rx),
      Err(_) => Err(error::Error::SendError),
    }
  }

  fn run(rx: mpsc::Receiver<Requests>) {
    tokio::runtime::Builder::new_multi_thread().enable_all().build().unwrap().block_on(async {
      loop {
        let x = match rx.recv() {
          Ok(x)    => x,
          Err(err) => {
            println!("* * * Could not receive: {}", err);
            return;
          },
        };
        let rsps = fetch_n(CONCURRENT_REQUESTS, x.reqs).await;
        if let Err(err) = x.tx.send(rsps) {
          println!("*** Could not send: {}", err);
          return;
        }
      }
    }) 
  }
}

async fn fetch_n(n: usize, reqs: Vec<Request>) -> Vec<Response> {
  stream::iter(reqs)
    .map(|req| {
      async move {
        Response{
          key: req.key.clone(),
          data: match req.req.send().await {
            Err(err) => Err(err.into()),
            Ok(rsp)  => match rsp.error_for_status() {
              Ok(rsp) => match rsp.bytes().await {
                Ok(data) => Ok(data),
                Err(err) => Err(err.into()),
              },
              Err(err) => Err(err.into()),
            },
          }
        }
      }
    })
    .buffer_unordered(n)
    .collect()
    .await
}