Skip to main content

async_curl/
actor.rs

1use std::fmt::Debug;
2use std::time::Duration;
3
4use async_trait::async_trait;
5use curl::easy::{Easy2, Handler};
6use curl::multi::Multi;
7use log::trace;
8use tokio::runtime::{Builder, Runtime};
9use tokio::sync::mpsc::{self, Receiver, Sender};
10use tokio::sync::oneshot;
11use tokio::task::LocalSet;
12
13use crate::error::Error;
14
15#[async_trait]
16pub trait Actor<H>
17where
18    H: Handler + Debug + Send + 'static,
19{
20    async fn send_request(&self, easy2: Easy2<H>) -> Result<Easy2<H>, Error<H>>;
21}
22
23/// CurlActor is responsible for performing
24/// the contructed Easy2 object at the background
25/// to perform it asynchronously.
26/// ```
27/// use async_curl::actor::{Actor, CurlActor};
28/// use curl::easy::{Easy2, Handler, WriteError};
29///
30/// #[derive(Debug, Clone, Default)]
31/// pub struct ResponseHandler {
32///     data: Vec<u8>,
33/// }
34///
35/// impl Handler for ResponseHandler {
36///     /// This will store the response from the server
37///     /// to the data vector.
38///     fn write(&mut self, data: &[u8]) -> Result<usize, WriteError> {
39///         self.data.extend_from_slice(data);
40///         Ok(data.len())
41///     }
42/// }
43///
44/// impl ResponseHandler {
45///     /// Instantiation of the ResponseHandler
46///     /// and initialize the data vector.
47///     pub fn new() -> Self {
48///         Self::default()
49///     }
50///
51///     /// This will consumed the object and
52///     /// give the data to the caller
53///     pub fn get_data(self) -> Vec<u8> {
54///         self.data
55///     }
56/// }
57///
58/// # #[tokio::main(flavor = "current_thread")]
59/// # async fn main() -> Result<(), Box<dyn std::error::Error>>{
60/// let curl = CurlActor::new();
61/// let mut easy2 = Easy2::new(ResponseHandler::new());
62///
63/// easy2.url("https://www.rust-lang.org").unwrap();
64/// easy2.get(true).unwrap();
65///
66/// let response = curl.send_request(easy2).await.unwrap();
67/// eprintln!("{:?}", response.get_ref());
68///
69/// Ok(())
70/// # }
71/// ```
72///
73/// Example for multiple request executed
74/// at the same time.
75///
76/// ```
77/// use async_curl::actor::{Actor, CurlActor};
78/// use curl::easy::{Easy2, Handler, WriteError};
79///
80/// #[derive(Debug, Clone, Default)]
81/// pub struct ResponseHandler {
82///     data: Vec<u8>,
83/// }
84///
85/// impl Handler for ResponseHandler {
86///     /// This will store the response from the server
87///     /// to the data vector.
88///     fn write(&mut self, data: &[u8]) -> Result<usize, WriteError> {
89///         self.data.extend_from_slice(data);
90///         Ok(data.len())
91///     }
92/// }
93///
94/// impl ResponseHandler {
95///     /// Instantiation of the ResponseHandler
96///     /// and initialize the data vector.
97///     pub fn new() -> Self {
98///         Self::default()
99///     }
100///
101///     /// This will consumed the object and
102///     /// give the data to the caller
103///     pub fn get_data(self) -> Vec<u8> {
104///         self.data
105///     }
106/// }
107///
108/// # #[tokio::main(flavor = "current_thread")]
109/// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
110/// let actor = CurlActor::new();
111/// let mut easy2 = Easy2::new(ResponseHandler::new());
112/// easy2.url("https://www.rust-lang.org").unwrap();
113/// easy2.get(true).unwrap();
114///
115/// let actor1 = actor.clone();
116/// let spawn1 = tokio::spawn(async move {
117///     let response = actor1.send_request(easy2).await;
118///     let mut response = response.unwrap();
119///
120///     // Response body
121///     eprintln!(
122///         "Task 1 : {}",
123///         String::from_utf8_lossy(&response.get_ref().to_owned().get_data())
124///     );
125///     // Response status code
126///     let status_code = response.response_code().unwrap();
127///     eprintln!("Task 1 : {}", status_code);
128/// });
129///
130/// let mut easy2 = Easy2::new(ResponseHandler::new());
131/// easy2.url("https://www.rust-lang.org").unwrap();
132/// easy2.get(true).unwrap();
133///
134/// let spawn2 = tokio::spawn(async move {
135///     let response = actor.send_request(easy2).await;
136///     let mut response = response.unwrap();
137///
138///     // Response body
139///     eprintln!(
140///         "Task 2 : {}",
141///         String::from_utf8_lossy(&response.get_ref().to_owned().get_data())
142///     );
143///     // Response status code
144///     let status_code = response.response_code().unwrap();
145///     eprintln!("Task 2 : {}", status_code);
146/// });
147/// let (_, _) = tokio::join!(spawn1, spawn2);
148///
149/// Ok(())
150/// # }
151/// ```
152///
153use std::sync::Arc;
154use std::thread::JoinHandle;
155
156/// This contains the Easy2 object and a oneshot sender channel when passing into the
157/// background task to perform Curl asynchronously.
158#[derive(Debug)]
159pub struct Request<H: Handler + Debug + Send + 'static>(
160    Easy2<H>,
161    oneshot::Sender<Result<Easy2<H>, Error<H>>>,
162    TransferType,
163);
164
165/// This enum is used to differentiate between the two types of transfers: Multi and Easy2.
166#[derive(Debug, Clone)]
167enum TransferType {
168    Multi,
169    Easy2,
170}
171
172struct Inner<H>
173where
174    H: Handler + Debug + Send + 'static,
175{
176    request_sender: Option<Sender<Request<H>>>,
177    join_handle: Option<JoinHandle<()>>,
178}
179
180impl<H> Drop for Inner<H>
181where
182    H: Handler + Debug + Send + 'static,
183{
184    fn drop(&mut self) {
185        // Take and drop the sender so the background actor sees channel closed.
186        if let Some(sender) = self.request_sender.take() {
187            trace!("Dropping request sender to signal background actor to shut down.");
188            drop(sender);
189            trace!("Request sender dropped, signaling background actor to shut down.");
190        }
191        // Join the background thread to ensure graceful shutdown.
192        if let Some(handle) = self.join_handle.take() {
193            trace!("Attempting to join background actor thread for graceful shutdown...");
194            let _ = handle.join();
195            trace!("Background actor thread joined successfully.");
196        }
197    }
198}
199
200#[derive(Clone)]
201pub struct CurlActor<H>
202where
203    H: Handler + Debug + Send + 'static,
204{
205    inner: Arc<Inner<H>>,
206    transfer_type: TransferType,
207}
208
209impl<H> Default for CurlActor<H>
210where
211    H: Handler + Debug + Send + 'static,
212{
213    fn default() -> Self {
214        Self::new()
215    }
216}
217
218#[async_trait]
219impl<H> Actor<H> for CurlActor<H>
220where
221    H: Handler + Debug + Send + 'static,
222{
223    /// This will send Easy2 into the background task that will perform
224    /// curl asynchronously, await the response in the oneshot receiver and
225    /// return Easy2 back to the caller.
226    async fn send_request(&self, easy2: Easy2<H>) -> Result<Easy2<H>, Error<H>> {
227        let (oneshot_sender, oneshot_receiver) = oneshot::channel::<Result<Easy2<H>, Error<H>>>();
228        self.inner
229            .request_sender
230            .as_ref()
231            .expect("request_sender missing")
232            .send(Request(easy2, oneshot_sender, self.transfer_type.clone()))
233            .await?;
234        oneshot_receiver.await?
235    }
236}
237
238impl<H> CurlActor<H>
239where
240    H: Handler + Debug + Send + 'static,
241{
242    /// This creates the new instance of CurlActor to handle Curl perform asynchronously using Curl Multi
243    /// in a background thread to avoid blocking of other tasks.
244    pub fn new() -> Self {
245        let runtime = Builder::new_current_thread().enable_all().build().unwrap();
246        let (request_sender, request_receiver) = mpsc::channel::<Request<H>>(1);
247
248        let handle = Self::spawn_actor(runtime, request_receiver);
249
250        Self {
251            inner: Arc::new(Inner {
252                request_sender: Some(request_sender),
253                join_handle: Some(handle),
254            }),
255            transfer_type: TransferType::Easy2,
256        }
257    }
258
259    /// This creates the new instance of CurlActor to handle Curl perform asynchronously using Curl Multi
260    /// in a background thread to avoid blocking of other tasks. The user can provide a custom runtime
261    /// to use for the background task.
262    pub fn new_runtime(runtime: Runtime) -> Self {
263        let (request_sender, request_receiver) = mpsc::channel::<Request<H>>(1);
264
265        let handle = Self::spawn_actor(runtime, request_receiver);
266
267        Self {
268            inner: Arc::new(Inner {
269                request_sender: Some(request_sender),
270                join_handle: Some(handle),
271            }),
272            transfer_type: TransferType::Easy2,
273        }
274    }
275
276    /// Create a new CurlActor with a user-provided runtime and configurable channel capacity.
277    pub fn new_runtime_with_capacity(runtime: Runtime, capacity: usize) -> Self {
278        let (request_sender, request_receiver) = mpsc::channel::<Request<H>>(capacity);
279
280        let handle = Self::spawn_actor(runtime, request_receiver);
281
282        Self {
283            inner: Arc::new(Inner {
284                request_sender: Some(request_sender),
285                join_handle: Some(handle),
286            }),
287            transfer_type: TransferType::Easy2,
288        }
289    }
290
291    fn spawn_actor(runtime: Runtime, mut request_receiver: Receiver<Request<H>>) -> JoinHandle<()> {
292        std::thread::spawn(move || {
293            let local = LocalSet::new();
294            local.spawn_local(async move {
295                while let Some(Request(easy2, oneshot_sender, transfer_type)) =
296                    request_receiver.recv().await
297                {
298                    tokio::task::spawn_local(async move {
299                        let response = match transfer_type {
300                            TransferType::Easy2 => perform_curl_easy2(easy2).await,
301                            TransferType::Multi => perform_curl_multi(easy2).await,
302                        };
303                        if let Err(res) = oneshot_sender.send(response) {
304                            trace!("Warning! The receiver has been dropped. {:?}", res);
305                        }
306                    });
307                }
308            });
309            runtime.block_on(local);
310        })
311    }
312
313    /// This method allows the user to switch the transfer type to Curl Multi for the CurlActor.
314    pub fn use_multi_transfer(self) -> Self {
315        Self {
316            inner: self.inner,
317            transfer_type: TransferType::Multi,
318        }
319    }
320
321    /// This method allows the user to switch the transfer type to Curl Easy2 for the CurlActor.
322    pub fn use_easy2_transfer(self) -> Self {
323        Self {
324            inner: self.inner,
325            transfer_type: TransferType::Easy2,
326        }
327    }
328}
329
330async fn perform_curl_multi<H: Handler + Debug + Send + 'static>(
331    easy2: Easy2<H>,
332) -> Result<Easy2<H>, Error<H>> {
333    trace!("perform_curl_multi: starting curl multi operation");
334    tokio::task::spawn_blocking(move || -> Result<Easy2<H>, Error<H>> {
335        let multi = Multi::new();
336        let handle = multi.add2(easy2).map_err(|e| Error::Multi(e))?;
337
338        while multi.perform().map_err(|e| Error::Multi(e))? != 0 {
339            let timeout_result = multi
340                .get_timeout()
341                .map(|d| d.unwrap_or_else(|| Duration::from_secs(2)));
342
343            let timeout = match timeout_result {
344                Ok(duration) => duration,
345                Err(multi_error) => {
346                    if !multi_error.is_call_perform() {
347                        return Err(Error::Multi(multi_error));
348                    }
349                    Duration::ZERO
350                }
351            };
352
353            if !timeout.is_zero() {
354                trace!(
355                    "perform_curl_multi: waiting for IO or timeout {:?}",
356                    timeout
357                );
358                let ready = multi.wait(&mut [], timeout).map_err(Error::Multi)?;
359                trace!(
360                    "perform_curl_multi: wait completed, {} sockets ready",
361                    ready
362                );
363            }
364        }
365
366        // Inspect messages for transfer-level errors.
367        let mut transfer_error: Option<Error<H>> = None;
368        multi.messages(|msg| {
369            if let Some(Err(e)) = msg.result() {
370                transfer_error = Some(Error::Curl(e));
371            }
372        });
373
374        // Always attempt to remove the handle to clean up resources. If there was
375        // a transfer error prefer returning that error, but still try to perform
376        // the removal and log any cleanup failure.
377        let cleanup = multi.remove2(handle).map_err(|e| Error::Multi(e));
378
379        if let Some(e) = transfer_error {
380            if let Err(ref clean_err) = cleanup {
381                trace!(
382                    "perform_curl_multi: remove2 failed during cleanup: {:?}",
383                    clean_err
384                );
385            }
386            Err(e)
387        } else {
388            cleanup
389        }
390    })
391    .await
392    .map_err(Error::JoinError)?
393}
394
395async fn perform_curl_easy2<H: Handler + Debug + Send + 'static>(
396    easy2: Easy2<H>,
397) -> Result<Easy2<H>, Error<H>> {
398    trace!("perform_curl_easy2: starting curl easy2 operation");
399    tokio::task::spawn_blocking(move || -> Result<Easy2<H>, Error<H>> {
400        easy2.perform().map_err(|e| Error::Curl(e))?;
401        Ok(easy2)
402    })
403    .await
404    .map_err(Error::JoinError)?
405}