Skip to main content

async_curl/
actor.rs

1use std::fmt::Debug;
2use std::time::Duration;
3
4use async_trait::async_trait;
5use curl::easy::{Easy2, Handler};
6use curl::multi::Multi;
7use log::trace;
8use tokio::runtime::{Builder, Runtime};
9use tokio::sync::mpsc::{self, Receiver, Sender};
10use tokio::sync::oneshot;
11use tokio::task::LocalSet;
12
13use crate::error::Error;
14
15#[async_trait]
16pub trait Actor<H>
17where
18    H: Handler + Debug + Send + 'static,
19{
20    async fn send_request(&self, easy2: Easy2<H>) -> Result<Easy2<H>, Error<H>>;
21    async fn perform_easy2(&self, easy2: Easy2<H>) -> Result<Easy2<H>, Error<H>>;
22}
23
24/// CurlActor is responsible for performing
25/// the contructed Easy2 object at the background
26/// to perform it asynchronously.
27/// ```
28/// use async_curl::actor::{Actor, CurlActor};
29/// use curl::easy::{Easy2, Handler, WriteError};
30///
31/// #[derive(Debug, Clone, Default)]
32/// pub struct ResponseHandler {
33///     data: Vec<u8>,
34/// }
35///
36/// impl Handler for ResponseHandler {
37///     /// This will store the response from the server
38///     /// to the data vector.
39///     fn write(&mut self, data: &[u8]) -> Result<usize, WriteError> {
40///         self.data.extend_from_slice(data);
41///         Ok(data.len())
42///     }
43/// }
44///
45/// impl ResponseHandler {
46///     /// Instantiation of the ResponseHandler
47///     /// and initialize the data vector.
48///     pub fn new() -> Self {
49///         Self::default()
50///     }
51///
52///     /// This will consumed the object and
53///     /// give the data to the caller
54///     pub fn get_data(self) -> Vec<u8> {
55///         self.data
56///     }
57/// }
58///
59/// # #[tokio::main(flavor = "current_thread")]
60/// # async fn main() -> Result<(), Box<dyn std::error::Error>>{
61/// let curl = CurlActor::new();
62/// let mut easy2 = Easy2::new(ResponseHandler::new());
63///
64/// easy2.url("https://www.rust-lang.org").unwrap();
65/// easy2.get(true).unwrap();
66///
67/// let response = curl.send_request(easy2).await.unwrap();
68/// eprintln!("{:?}", response.get_ref());
69///
70/// Ok(())
71/// # }
72/// ```
73///
74/// Example for multiple request executed
75/// at the same time.
76///
77/// ```
78/// use async_curl::actor::{Actor, CurlActor};
79/// use curl::easy::{Easy2, Handler, WriteError};
80///
81/// #[derive(Debug, Clone, Default)]
82/// pub struct ResponseHandler {
83///     data: Vec<u8>,
84/// }
85///
86/// impl Handler for ResponseHandler {
87///     /// This will store the response from the server
88///     /// to the data vector.
89///     fn write(&mut self, data: &[u8]) -> Result<usize, WriteError> {
90///         self.data.extend_from_slice(data);
91///         Ok(data.len())
92///     }
93/// }
94///
95/// impl ResponseHandler {
96///     /// Instantiation of the ResponseHandler
97///     /// and initialize the data vector.
98///     pub fn new() -> Self {
99///         Self::default()
100///     }
101///
102///     /// This will consumed the object and
103///     /// give the data to the caller
104///     pub fn get_data(self) -> Vec<u8> {
105///         self.data
106///     }
107/// }
108///
109/// # #[tokio::main(flavor = "current_thread")]
110/// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
111/// let actor = CurlActor::new();
112/// let mut easy2 = Easy2::new(ResponseHandler::new());
113/// easy2.url("https://www.rust-lang.org").unwrap();
114/// easy2.get(true).unwrap();
115///
116/// let actor1 = actor.clone();
117/// let spawn1 = tokio::spawn(async move {
118///     let response = actor1.send_request(easy2).await;
119///     let mut response = response.unwrap();
120///
121///     // Response body
122///     eprintln!(
123///         "Task 1 : {}",
124///         String::from_utf8_lossy(&response.get_ref().to_owned().get_data())
125///     );
126///     // Response status code
127///     let status_code = response.response_code().unwrap();
128///     eprintln!("Task 1 : {}", status_code);
129/// });
130///
131/// let mut easy2 = Easy2::new(ResponseHandler::new());
132/// easy2.url("https://www.rust-lang.org").unwrap();
133/// easy2.get(true).unwrap();
134///
135/// let spawn2 = tokio::spawn(async move {
136///     let response = actor.send_request(easy2).await;
137///     let mut response = response.unwrap();
138///
139///     // Response body
140///     eprintln!(
141///         "Task 2 : {}",
142///         String::from_utf8_lossy(&response.get_ref().to_owned().get_data())
143///     );
144///     // Response status code
145///     let status_code = response.response_code().unwrap();
146///     eprintln!("Task 2 : {}", status_code);
147/// });
148/// let (_, _) = tokio::join!(spawn1, spawn2);
149///
150/// Ok(())
151/// # }
152/// ```
153///
154use std::sync::Arc;
155use std::thread::JoinHandle;
156
157/// This contains the Easy2 object and a oneshot sender channel when passing into the
158/// background task to perform Curl asynchronously.
159#[derive(Debug)]
160pub struct Request<H: Handler + Debug + Send + 'static>(
161    Easy2<H>,
162    oneshot::Sender<Result<Easy2<H>, Error<H>>>,
163    TransferType,
164);
165
166/// This enum is used to differentiate between the two types of transfers: Multi and Easy2.
167#[derive(Debug)]
168pub enum TransferType {
169    Multi,
170    Easy2,
171}
172
173struct Inner<H>
174where
175    H: Handler + Debug + Send + 'static,
176{
177    request_sender: Option<Sender<Request<H>>>,
178    join_handle: Option<JoinHandle<()>>,
179}
180
181impl<H> Drop for Inner<H>
182where
183    H: Handler + Debug + Send + 'static,
184{
185    fn drop(&mut self) {
186        // Take and drop the sender so the background actor sees channel closed.
187        if let Some(sender) = self.request_sender.take() {
188            trace!("Dropping request sender to signal background actor to shut down.");
189            drop(sender);
190            trace!("Request sender dropped, signaling background actor to shut down.");
191        }
192        // Join the background thread to ensure graceful shutdown.
193        if let Some(handle) = self.join_handle.take() {
194            trace!("Attempting to join background actor thread for graceful shutdown...");
195            let _ = handle.join();
196            trace!("Background actor thread joined successfully.");
197        }
198    }
199}
200
201#[derive(Clone)]
202pub struct CurlActor<H>
203where
204    H: Handler + Debug + Send + 'static,
205{
206    inner: Arc<Inner<H>>,
207}
208
209impl<H> Default for CurlActor<H>
210where
211    H: Handler + Debug + Send + 'static,
212{
213    fn default() -> Self {
214        Self::new()
215    }
216}
217
218#[async_trait]
219impl<H> Actor<H> for CurlActor<H>
220where
221    H: Handler + Debug + Send + 'static,
222{
223    /// This will send Easy2 into the background task that will perform
224    /// curl asynchronously, await the response in the oneshot receiver and
225    /// return Easy2 back to the caller. This uses the curl multi interface to perform the request.
226    async fn send_request(&self, easy2: Easy2<H>) -> Result<Easy2<H>, Error<H>> {
227        let (oneshot_sender, oneshot_receiver) = oneshot::channel::<Result<Easy2<H>, Error<H>>>();
228        self.inner
229            .request_sender
230            .as_ref()
231            .expect("request_sender missing")
232            .send(Request(easy2, oneshot_sender, TransferType::Multi))
233            .await?;
234        oneshot_receiver.await?
235    }
236
237    /// This will send Easy2 into the background task that will perform
238    /// curl asynchronously, await the response in the oneshot receiver and
239    /// return Easy2 back to the caller. This uses the curl easy2 interface to perform the request.
240    async fn perform_easy2(&self, easy2: Easy2<H>) -> Result<Easy2<H>, Error<H>> {
241        let (oneshot_sender, oneshot_receiver) = oneshot::channel::<Result<Easy2<H>, Error<H>>>();
242        self.inner
243            .request_sender
244            .as_ref()
245            .expect("request_sender missing")
246            .send(Request(easy2, oneshot_sender, TransferType::Easy2))
247            .await?;
248        oneshot_receiver.await?
249    }
250}
251
252impl<H> CurlActor<H>
253where
254    H: Handler + Debug + Send + 'static,
255{
256    /// This creates the new instance of CurlActor to handle Curl perform asynchronously using Curl Multi
257    /// in a background thread to avoid blocking of other tasks.
258    pub fn new() -> Self {
259        let runtime = Builder::new_current_thread().enable_all().build().unwrap();
260        let (request_sender, request_receiver) = mpsc::channel::<Request<H>>(1);
261
262        let handle = Self::spawn_actor(runtime, request_receiver);
263
264        Self {
265            inner: Arc::new(Inner {
266                request_sender: Some(request_sender),
267                join_handle: Some(handle),
268            }),
269        }
270    }
271
272    /// This creates the new instance of CurlActor to handle Curl perform asynchronously using Curl Multi
273    /// in a background thread to avoid blocking of other tasks. The user can provide a custom runtime
274    /// to use for the background task.
275    pub fn new_runtime(runtime: Runtime) -> Self {
276        let (request_sender, request_receiver) = mpsc::channel::<Request<H>>(1);
277
278        let handle = Self::spawn_actor(runtime, request_receiver);
279
280        Self {
281            inner: Arc::new(Inner {
282                request_sender: Some(request_sender),
283                join_handle: Some(handle),
284            }),
285        }
286    }
287
288    /// Create a new CurlActor with a user-provided runtime and configurable channel capacity.
289    pub fn new_runtime_with_capacity(runtime: Runtime, capacity: usize) -> Self {
290        let (request_sender, request_receiver) = mpsc::channel::<Request<H>>(capacity);
291
292        let handle = Self::spawn_actor(runtime, request_receiver);
293
294        Self {
295            inner: Arc::new(Inner {
296                request_sender: Some(request_sender),
297                join_handle: Some(handle),
298            }),
299        }
300    }
301
302    fn spawn_actor(runtime: Runtime, mut request_receiver: Receiver<Request<H>>) -> JoinHandle<()> {
303        std::thread::spawn(move || {
304            let local = LocalSet::new();
305            local.spawn_local(async move {
306                while let Some(Request(easy2, oneshot_sender, transfer_type)) =
307                    request_receiver.recv().await
308                {
309                    tokio::task::spawn_local(async move {
310                        let response = match transfer_type {
311                            TransferType::Easy2 => perform_curl_easy2(easy2).await,
312                            TransferType::Multi => perform_curl_multi(easy2).await,
313                        };
314                        if let Err(res) = oneshot_sender.send(response) {
315                            trace!("Warning! The receiver has been dropped. {:?}", res);
316                        }
317                    });
318                }
319            });
320            runtime.block_on(local);
321        })
322    }
323}
324
325async fn perform_curl_multi<H: Handler + Debug + Send + 'static>(
326    easy2: Easy2<H>,
327) -> Result<Easy2<H>, Error<H>> {
328    trace!("perform_curl_multi: starting curl multi operation");
329    tokio::task::spawn_blocking(move || -> Result<Easy2<H>, Error<H>> {
330        let multi = Multi::new();
331        let handle = multi.add2(easy2).map_err(|e| Error::Multi(e))?;
332
333        while multi.perform().map_err(|e| Error::Multi(e))? != 0 {
334            let timeout_result = multi
335                .get_timeout()
336                .map(|d| d.unwrap_or_else(|| Duration::from_secs(2)));
337
338            let timeout = match timeout_result {
339                Ok(duration) => duration,
340                Err(multi_error) => {
341                    if !multi_error.is_call_perform() {
342                        return Err(Error::Multi(multi_error));
343                    }
344                    Duration::ZERO
345                }
346            };
347
348            if !timeout.is_zero() {
349                trace!(
350                    "perform_curl_multi: waiting for IO or timeout {:?}",
351                    timeout
352                );
353                let ready = multi.wait(&mut [], timeout).map_err(Error::Multi)?;
354                trace!(
355                    "perform_curl_multi: wait completed, {} sockets ready",
356                    ready
357                );
358            }
359        }
360
361        // Inspect messages for transfer-level errors.
362        let mut transfer_error: Option<Error<H>> = None;
363        multi.messages(|msg| {
364            if let Some(Err(e)) = msg.result() {
365                transfer_error = Some(Error::Curl(e));
366            }
367        });
368
369        // Always attempt to remove the handle to clean up resources. If there was
370        // a transfer error prefer returning that error, but still try to perform
371        // the removal and log any cleanup failure.
372        let cleanup = multi.remove2(handle).map_err(|e| Error::Multi(e));
373
374        if let Some(e) = transfer_error {
375            if let Err(ref clean_err) = cleanup {
376                trace!(
377                    "perform_curl_multi: remove2 failed during cleanup: {:?}",
378                    clean_err
379                );
380            }
381            Err(e)
382        } else {
383            cleanup
384        }
385    })
386    .await
387    .map_err(Error::JoinError)?
388}
389
390async fn perform_curl_easy2<H: Handler + Debug + Send + 'static>(
391    easy2: Easy2<H>,
392) -> Result<Easy2<H>, Error<H>> {
393    trace!("perform_curl_easy2: starting curl easy2 operation");
394    tokio::task::spawn_blocking(move || -> Result<Easy2<H>, Error<H>> {
395        easy2.perform().map_err(|e| Error::Curl(e))?;
396        Ok(easy2)
397    })
398    .await
399    .map_err(Error::JoinError)?
400}