async-curl 0.7.0

An asynchronous implementation to perform curl operations with tokio.
Documentation
use std::fmt::Debug;
use std::time::Duration;

use async_trait::async_trait;
use curl::easy::{Easy2, Handler};
use curl::multi::Multi;
use log::trace;
use tokio::runtime::{Builder, Runtime};
use tokio::sync::mpsc::{self, Receiver, Sender};
use tokio::sync::oneshot;
use tokio::task::LocalSet;

use crate::error::Error;

#[async_trait]
pub trait Actor<H>
where
    H: Handler + Debug + Send + 'static,
{
    async fn send_request(&self, easy2: Easy2<H>) -> Result<Easy2<H>, Error<H>>;
    async fn perform_easy2(&self, easy2: Easy2<H>) -> Result<Easy2<H>, Error<H>>;
}

/// CurlActor is responsible for performing
/// the contructed Easy2 object at the background
/// to perform it asynchronously.
/// ```
/// use async_curl::actor::{Actor, CurlActor};
/// use curl::easy::{Easy2, Handler, WriteError};
///
/// #[derive(Debug, Clone, Default)]
/// pub struct ResponseHandler {
///     data: Vec<u8>,
/// }
///
/// impl Handler for ResponseHandler {
///     /// This will store the response from the server
///     /// to the data vector.
///     fn write(&mut self, data: &[u8]) -> Result<usize, WriteError> {
///         self.data.extend_from_slice(data);
///         Ok(data.len())
///     }
/// }
///
/// impl ResponseHandler {
///     /// Instantiation of the ResponseHandler
///     /// and initialize the data vector.
///     pub fn new() -> Self {
///         Self::default()
///     }
///
///     /// This will consumed the object and
///     /// give the data to the caller
///     pub fn get_data(self) -> Vec<u8> {
///         self.data
///     }
/// }
///
/// # #[tokio::main(flavor = "current_thread")]
/// # async fn main() -> Result<(), Box<dyn std::error::Error>>{
/// let curl = CurlActor::new();
/// let mut easy2 = Easy2::new(ResponseHandler::new());
///
/// easy2.url("https://www.rust-lang.org").unwrap();
/// easy2.get(true).unwrap();
///
/// let response = curl.send_request(easy2).await.unwrap();
/// eprintln!("{:?}", response.get_ref());
///
/// Ok(())
/// # }
/// ```
///
/// Example for multiple request executed
/// at the same time.
///
/// ```
/// use async_curl::actor::{Actor, CurlActor};
/// use curl::easy::{Easy2, Handler, WriteError};
///
/// #[derive(Debug, Clone, Default)]
/// pub struct ResponseHandler {
///     data: Vec<u8>,
/// }
///
/// impl Handler for ResponseHandler {
///     /// This will store the response from the server
///     /// to the data vector.
///     fn write(&mut self, data: &[u8]) -> Result<usize, WriteError> {
///         self.data.extend_from_slice(data);
///         Ok(data.len())
///     }
/// }
///
/// impl ResponseHandler {
///     /// Instantiation of the ResponseHandler
///     /// and initialize the data vector.
///     pub fn new() -> Self {
///         Self::default()
///     }
///
///     /// This will consumed the object and
///     /// give the data to the caller
///     pub fn get_data(self) -> Vec<u8> {
///         self.data
///     }
/// }
///
/// # #[tokio::main(flavor = "current_thread")]
/// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
/// let actor = CurlActor::new();
/// let mut easy2 = Easy2::new(ResponseHandler::new());
/// easy2.url("https://www.rust-lang.org").unwrap();
/// easy2.get(true).unwrap();
///
/// let actor1 = actor.clone();
/// let spawn1 = tokio::spawn(async move {
///     let response = actor1.send_request(easy2).await;
///     let mut response = response.unwrap();
///
///     // Response body
///     eprintln!(
///         "Task 1 : {}",
///         String::from_utf8_lossy(&response.get_ref().to_owned().get_data())
///     );
///     // Response status code
///     let status_code = response.response_code().unwrap();
///     eprintln!("Task 1 : {}", status_code);
/// });
///
/// let mut easy2 = Easy2::new(ResponseHandler::new());
/// easy2.url("https://www.rust-lang.org").unwrap();
/// easy2.get(true).unwrap();
///
/// let spawn2 = tokio::spawn(async move {
///     let response = actor.send_request(easy2).await;
///     let mut response = response.unwrap();
///
///     // Response body
///     eprintln!(
///         "Task 2 : {}",
///         String::from_utf8_lossy(&response.get_ref().to_owned().get_data())
///     );
///     // Response status code
///     let status_code = response.response_code().unwrap();
///     eprintln!("Task 2 : {}", status_code);
/// });
/// let (_, _) = tokio::join!(spawn1, spawn2);
///
/// Ok(())
/// # }
/// ```
///
use std::sync::Arc;
use std::thread::JoinHandle;

/// This contains the Easy2 object and a oneshot sender channel when passing into the
/// background task to perform Curl asynchronously.
#[derive(Debug)]
pub struct Request<H: Handler + Debug + Send + 'static>(
    Easy2<H>,
    oneshot::Sender<Result<Easy2<H>, Error<H>>>,
    TransferType,
);

/// This enum is used to differentiate between the two types of transfers: Multi and Easy2.
#[derive(Debug)]
pub enum TransferType {
    Multi,
    Easy2,
}

struct Inner<H>
where
    H: Handler + Debug + Send + 'static,
{
    request_sender: Option<Sender<Request<H>>>,
    join_handle: Option<JoinHandle<()>>,
}

impl<H> Drop for Inner<H>
where
    H: Handler + Debug + Send + 'static,
{
    fn drop(&mut self) {
        // Take and drop the sender so the background actor sees channel closed.
        if let Some(sender) = self.request_sender.take() {
            trace!("Dropping request sender to signal background actor to shut down.");
            drop(sender);
            trace!("Request sender dropped, signaling background actor to shut down.");
        }
        // Join the background thread to ensure graceful shutdown.
        if let Some(handle) = self.join_handle.take() {
            trace!("Attempting to join background actor thread for graceful shutdown...");
            let _ = handle.join();
            trace!("Background actor thread joined successfully.");
        }
    }
}

#[derive(Clone)]
pub struct CurlActor<H>
where
    H: Handler + Debug + Send + 'static,
{
    inner: Arc<Inner<H>>,
}

impl<H> Default for CurlActor<H>
where
    H: Handler + Debug + Send + 'static,
{
    fn default() -> Self {
        Self::new()
    }
}

#[async_trait]
impl<H> Actor<H> for CurlActor<H>
where
    H: Handler + Debug + Send + 'static,
{
    /// This will send Easy2 into the background task that will perform
    /// curl asynchronously, await the response in the oneshot receiver and
    /// return Easy2 back to the caller. This uses the curl multi interface to perform the request.
    async fn send_request(&self, easy2: Easy2<H>) -> Result<Easy2<H>, Error<H>> {
        let (oneshot_sender, oneshot_receiver) = oneshot::channel::<Result<Easy2<H>, Error<H>>>();
        self.inner
            .request_sender
            .as_ref()
            .expect("request_sender missing")
            .send(Request(easy2, oneshot_sender, TransferType::Multi))
            .await?;
        oneshot_receiver.await?
    }

    /// This will send Easy2 into the background task that will perform
    /// curl asynchronously, await the response in the oneshot receiver and
    /// return Easy2 back to the caller. This uses the curl easy2 interface to perform the request.
    async fn perform_easy2(&self, easy2: Easy2<H>) -> Result<Easy2<H>, Error<H>> {
        let (oneshot_sender, oneshot_receiver) = oneshot::channel::<Result<Easy2<H>, Error<H>>>();
        self.inner
            .request_sender
            .as_ref()
            .expect("request_sender missing")
            .send(Request(easy2, oneshot_sender, TransferType::Easy2))
            .await?;
        oneshot_receiver.await?
    }
}

impl<H> CurlActor<H>
where
    H: Handler + Debug + Send + 'static,
{
    /// This creates the new instance of CurlActor to handle Curl perform asynchronously using Curl Multi
    /// in a background thread to avoid blocking of other tasks.
    pub fn new() -> Self {
        let runtime = Builder::new_current_thread().enable_all().build().unwrap();
        let (request_sender, request_receiver) = mpsc::channel::<Request<H>>(1);

        let handle = Self::spawn_actor(runtime, request_receiver);

        Self {
            inner: Arc::new(Inner {
                request_sender: Some(request_sender),
                join_handle: Some(handle),
            }),
        }
    }

    /// This creates the new instance of CurlActor to handle Curl perform asynchronously using Curl Multi
    /// in a background thread to avoid blocking of other tasks. The user can provide a custom runtime
    /// to use for the background task.
    pub fn new_runtime(runtime: Runtime) -> Self {
        let (request_sender, request_receiver) = mpsc::channel::<Request<H>>(1);

        let handle = Self::spawn_actor(runtime, request_receiver);

        Self {
            inner: Arc::new(Inner {
                request_sender: Some(request_sender),
                join_handle: Some(handle),
            }),
        }
    }

    /// Create a new CurlActor with a user-provided runtime and configurable channel capacity.
    pub fn new_runtime_with_capacity(runtime: Runtime, capacity: usize) -> Self {
        let (request_sender, request_receiver) = mpsc::channel::<Request<H>>(capacity);

        let handle = Self::spawn_actor(runtime, request_receiver);

        Self {
            inner: Arc::new(Inner {
                request_sender: Some(request_sender),
                join_handle: Some(handle),
            }),
        }
    }

    fn spawn_actor(runtime: Runtime, mut request_receiver: Receiver<Request<H>>) -> JoinHandle<()> {
        std::thread::spawn(move || {
            let local = LocalSet::new();
            local.spawn_local(async move {
                while let Some(Request(easy2, oneshot_sender, transfer_type)) =
                    request_receiver.recv().await
                {
                    tokio::task::spawn_local(async move {
                        let response = match transfer_type {
                            TransferType::Easy2 => perform_curl_easy2(easy2).await,
                            TransferType::Multi => perform_curl_multi(easy2).await,
                        };
                        if let Err(res) = oneshot_sender.send(response) {
                            trace!("Warning! The receiver has been dropped. {:?}", res);
                        }
                    });
                }
            });
            runtime.block_on(local);
        })
    }
}

async fn perform_curl_multi<H: Handler + Debug + Send + 'static>(
    easy2: Easy2<H>,
) -> Result<Easy2<H>, Error<H>> {
    trace!("perform_curl_multi: starting curl multi operation");
    tokio::task::spawn_blocking(move || -> Result<Easy2<H>, Error<H>> {
        let multi = Multi::new();
        let handle = multi.add2(easy2).map_err(|e| Error::Multi(e))?;

        while multi.perform().map_err(|e| Error::Multi(e))? != 0 {
            let timeout_result = multi
                .get_timeout()
                .map(|d| d.unwrap_or_else(|| Duration::from_secs(2)));

            let timeout = match timeout_result {
                Ok(duration) => duration,
                Err(multi_error) => {
                    if !multi_error.is_call_perform() {
                        return Err(Error::Multi(multi_error));
                    }
                    Duration::ZERO
                }
            };

            if !timeout.is_zero() {
                trace!(
                    "perform_curl_multi: waiting for IO or timeout {:?}",
                    timeout
                );
                let ready = multi.wait(&mut [], timeout).map_err(Error::Multi)?;
                trace!(
                    "perform_curl_multi: wait completed, {} sockets ready",
                    ready
                );
            }
        }

        // Inspect messages for transfer-level errors.
        let mut transfer_error: Option<Error<H>> = None;
        multi.messages(|msg| {
            if let Some(Err(e)) = msg.result() {
                transfer_error = Some(Error::Curl(e));
            }
        });

        // Always attempt to remove the handle to clean up resources. If there was
        // a transfer error prefer returning that error, but still try to perform
        // the removal and log any cleanup failure.
        let cleanup = multi.remove2(handle).map_err(|e| Error::Multi(e));

        if let Some(e) = transfer_error {
            if let Err(ref clean_err) = cleanup {
                trace!(
                    "perform_curl_multi: remove2 failed during cleanup: {:?}",
                    clean_err
                );
            }
            Err(e)
        } else {
            cleanup
        }
    })
    .await
    .map_err(Error::JoinError)?
}

async fn perform_curl_easy2<H: Handler + Debug + Send + 'static>(
    easy2: Easy2<H>,
) -> Result<Easy2<H>, Error<H>> {
    trace!("perform_curl_easy2: starting curl easy2 operation");
    tokio::task::spawn_blocking(move || -> Result<Easy2<H>, Error<H>> {
        easy2.perform().map_err(|e| Error::Curl(e))?;
        Ok(easy2)
    })
    .await
    .map_err(Error::JoinError)?
}