Skip to main content

async_curl/
actor.rs

1use std::fmt::Debug;
2use std::time::Duration;
3
4use async_trait::async_trait;
5use curl::easy::{Easy2, Handler};
6use curl::multi::Multi;
7use log::trace;
8use tokio::runtime::{Builder, Runtime};
9use tokio::sync::mpsc::{self, Receiver, Sender};
10use tokio::sync::oneshot;
11use tokio::task::LocalSet;
12
13use crate::error::Error;
14
15#[async_trait]
16pub trait Actor<H>
17where
18    H: Handler + Debug + Send + 'static,
19{
20    async fn send_request(&self, easy2: Easy2<H>) -> Result<Easy2<H>, Error<H>>;
21}
22
23/// CurlActor is responsible for performing
24/// the contructed Easy2 object at the background
25/// to perform it asynchronously.
26/// ```
27/// use async_curl::actor::{Actor, CurlActor};
28/// use curl::easy::{Easy2, Handler, WriteError};
29///
30/// #[derive(Debug, Clone, Default)]
31/// pub struct ResponseHandler {
32///     data: Vec<u8>,
33/// }
34///
35/// impl Handler for ResponseHandler {
36///     /// This will store the response from the server
37///     /// to the data vector.
38///     fn write(&mut self, data: &[u8]) -> Result<usize, WriteError> {
39///         self.data.extend_from_slice(data);
40///         Ok(data.len())
41///     }
42/// }
43///
44/// impl ResponseHandler {
45///     /// Instantiation of the ResponseHandler
46///     /// and initialize the data vector.
47///     pub fn new() -> Self {
48///         Self::default()
49///     }
50///
51///     /// This will consumed the object and
52///     /// give the data to the caller
53///     pub fn get_data(self) -> Vec<u8> {
54///         self.data
55///     }
56/// }
57///
58/// # #[tokio::main(flavor = "current_thread")]
59/// # async fn main() -> Result<(), Box<dyn std::error::Error>>{
60/// let curl = CurlActor::new();
61/// let mut easy2 = Easy2::new(ResponseHandler::new());
62///
63/// easy2.url("https://www.rust-lang.org").unwrap();
64/// easy2.get(true).unwrap();
65///
66/// let response = curl.send_request(easy2).await.unwrap();
67/// eprintln!("{:?}", response.get_ref());
68///
69/// Ok(())
70/// # }
71/// ```
72///
73/// Example for multiple request executed
74/// at the same time.
75///
76/// ```
77/// use async_curl::actor::{Actor, CurlActor};
78/// use curl::easy::{Easy2, Handler, WriteError};
79///
80/// #[derive(Debug, Clone, Default)]
81/// pub struct ResponseHandler {
82///     data: Vec<u8>,
83/// }
84///
85/// impl Handler for ResponseHandler {
86///     /// This will store the response from the server
87///     /// to the data vector.
88///     fn write(&mut self, data: &[u8]) -> Result<usize, WriteError> {
89///         self.data.extend_from_slice(data);
90///         Ok(data.len())
91///     }
92/// }
93///
94/// impl ResponseHandler {
95///     /// Instantiation of the ResponseHandler
96///     /// and initialize the data vector.
97///     pub fn new() -> Self {
98///         Self::default()
99///     }
100///
101///     /// This will consumed the object and
102///     /// give the data to the caller
103///     pub fn get_data(self) -> Vec<u8> {
104///         self.data
105///     }
106/// }
107///
108/// # #[tokio::main(flavor = "current_thread")]
109/// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
110/// let actor = CurlActor::new();
111/// let mut easy2 = Easy2::new(ResponseHandler::new());
112/// easy2.url("https://www.rust-lang.org").unwrap();
113/// easy2.get(true).unwrap();
114///
115/// let actor1 = actor.clone();
116/// let spawn1 = tokio::spawn(async move {
117///     let response = actor1.send_request(easy2).await;
118///     let mut response = response.unwrap();
119///
120///     // Response body
121///     eprintln!(
122///         "Task 1 : {}",
123///         String::from_utf8_lossy(&response.get_ref().to_owned().get_data())
124///     );
125///     // Response status code
126///     let status_code = response.response_code().unwrap();
127///     eprintln!("Task 1 : {}", status_code);
128/// });
129///
130/// let mut easy2 = Easy2::new(ResponseHandler::new());
131/// easy2.url("https://www.rust-lang.org").unwrap();
132/// easy2.get(true).unwrap();
133///
134/// let spawn2 = tokio::spawn(async move {
135///     let response = actor.send_request(easy2).await;
136///     let mut response = response.unwrap();
137///
138///     // Response body
139///     eprintln!(
140///         "Task 2 : {}",
141///         String::from_utf8_lossy(&response.get_ref().to_owned().get_data())
142///     );
143///     // Response status code
144///     let status_code = response.response_code().unwrap();
145///     eprintln!("Task 2 : {}", status_code);
146/// });
147/// let (_, _) = tokio::join!(spawn1, spawn2);
148///
149/// Ok(())
150/// # }
151/// ```
152///
153use std::sync::Arc;
154use std::thread::JoinHandle;
155
156struct Inner<H>
157where
158    H: Handler + Debug + Send + 'static,
159{
160    request_sender: Option<Sender<Request<H>>>,
161    join_handle: Option<JoinHandle<()>>,
162}
163
164impl<H> Drop for Inner<H>
165where
166    H: Handler + Debug + Send + 'static,
167{
168    fn drop(&mut self) {
169        // Take and drop the sender so the background actor sees channel closed.
170        if let Some(sender) = self.request_sender.take() {
171            trace!("Dropping request sender to signal background actor to shut down.");
172            drop(sender);
173            trace!("Request sender dropped, signaling background actor to shut down.");
174        }
175        // Join the background thread to ensure graceful shutdown.
176        if let Some(handle) = self.join_handle.take() {
177            trace!("Attempting to join background actor thread for graceful shutdown...");
178            let _ = handle.join();
179            trace!("Background actor thread joined successfully.");
180        }
181    }
182}
183
184#[derive(Clone)]
185pub struct CurlActor<H>
186where
187    H: Handler + Debug + Send + 'static,
188{
189    inner: Arc<Inner<H>>,
190}
191
192impl<H> Default for CurlActor<H>
193where
194    H: Handler + Debug + Send + 'static,
195{
196    fn default() -> Self {
197        Self::new()
198    }
199}
200
201#[async_trait]
202impl<H> Actor<H> for CurlActor<H>
203where
204    H: Handler + Debug + Send + 'static,
205{
206    /// This will send Easy2 into the background task that will perform
207    /// curl asynchronously, await the response in the oneshot receiver and
208    /// return Easy2 back to the caller.
209    async fn send_request(&self, easy2: Easy2<H>) -> Result<Easy2<H>, Error<H>> {
210        let (oneshot_sender, oneshot_receiver) = oneshot::channel::<Result<Easy2<H>, Error<H>>>();
211        self.inner
212            .request_sender
213            .as_ref()
214            .expect("request_sender missing")
215            .send(Request(easy2, oneshot_sender))
216            .await?;
217        oneshot_receiver.await?
218    }
219}
220
221impl<H> CurlActor<H>
222where
223    H: Handler + Debug + Send + 'static,
224{
225    /// This creates the new instance of CurlActor to handle Curl perform asynchronously using Curl Multi
226    /// in a background thread to avoid blocking of other tasks.
227    pub fn new() -> Self {
228        let runtime = Builder::new_current_thread().enable_all().build().unwrap();
229        let (request_sender, request_receiver) = mpsc::channel::<Request<H>>(1);
230
231        let handle = Self::spawn_actor(runtime, request_receiver);
232
233        Self {
234            inner: Arc::new(Inner {
235                request_sender: Some(request_sender),
236                join_handle: Some(handle),
237            }),
238        }
239    }
240
241    /// This creates the new instance of CurlActor to handle Curl perform asynchronously using Curl Multi
242    /// in a background thread to avoid blocking of other tasks. The user can provide a custom runtime
243    /// to use for the background task.
244    pub fn new_runtime(runtime: Runtime) -> Self {
245        let (request_sender, request_receiver) = mpsc::channel::<Request<H>>(1);
246
247        let handle = Self::spawn_actor(runtime, request_receiver);
248
249        Self {
250            inner: Arc::new(Inner {
251                request_sender: Some(request_sender),
252                join_handle: Some(handle),
253            }),
254        }
255    }
256
257    /// Create a new CurlActor with a user-provided runtime and configurable channel capacity.
258    pub fn new_runtime_with_capacity(runtime: Runtime, capacity: usize) -> Self {
259        let (request_sender, request_receiver) = mpsc::channel::<Request<H>>(capacity);
260
261        let handle = Self::spawn_actor(runtime, request_receiver);
262
263        Self {
264            inner: Arc::new(Inner {
265                request_sender: Some(request_sender),
266                join_handle: Some(handle),
267            }),
268        }
269    }
270
271    fn spawn_actor(runtime: Runtime, mut request_receiver: Receiver<Request<H>>) -> JoinHandle<()> {
272        std::thread::spawn(move || {
273            let local = LocalSet::new();
274            local.spawn_local(async move {
275                while let Some(Request(easy2, oneshot_sender)) = request_receiver.recv().await {
276                    tokio::task::spawn_local(async move {
277                        let response = perform_curl_multi(easy2).await;
278                        if let Err(res) = oneshot_sender.send(response) {
279                            trace!("Warning! The receiver has been dropped. {:?}", res);
280                        }
281                    });
282                }
283            });
284            runtime.block_on(local);
285        })
286    }
287}
288
289async fn perform_curl_multi<H: Handler + Debug + Send + 'static>(
290    easy2: Easy2<H>,
291) -> Result<Easy2<H>, Error<H>> {
292    tokio::task::spawn_blocking(move || -> Result<Easy2<H>, Error<H>> {
293        let multi = Multi::new();
294        let handle = multi.add2(easy2).map_err(|e| Error::Multi(e))?;
295
296        while multi.perform().map_err(|e| Error::Multi(e))? != 0 {
297            let timeout_result = multi
298                .get_timeout()
299                .map(|d| d.unwrap_or_else(|| Duration::from_secs(2)));
300
301            let timeout = match timeout_result {
302                Ok(duration) => duration,
303                Err(multi_error) => {
304                    if !multi_error.is_call_perform() {
305                        return Err(Error::Multi(multi_error));
306                    }
307                    Duration::ZERO
308                }
309            };
310
311            if !timeout.is_zero() {
312                trace!(
313                    "perform_curl_multi: waiting for IO or timeout {:?}",
314                    timeout
315                );
316                let ready = multi.wait(&mut [], timeout).map_err(Error::Multi)?;
317                trace!(
318                    "perform_curl_multi: wait completed, {} sockets ready",
319                    ready
320                );
321            }
322        }
323
324        // Inspect messages for transfer-level errors.
325        let mut transfer_error: Option<Error<H>> = None;
326        multi.messages(|msg| {
327            if let Some(Err(e)) = msg.result() {
328                transfer_error = Some(Error::Curl(e));
329            }
330        });
331
332        // Always attempt to remove the handle to clean up resources. If there was
333        // a transfer error prefer returning that error, but still try to perform
334        // the removal and log any cleanup failure.
335        let cleanup = multi.remove2(handle).map_err(|e| Error::Multi(e));
336
337        if let Some(e) = transfer_error {
338            if let Err(ref clean_err) = cleanup {
339                trace!(
340                    "perform_curl_multi: remove2 failed during cleanup: {:?}",
341                    clean_err
342                );
343            }
344            Err(e)
345        } else {
346            cleanup
347        }
348    })
349    .await
350    .map_err(Error::JoinError)?
351}
352
353/// This contains the Easy2 object and a oneshot sender channel when passing into the
354/// background task to perform Curl asynchronously.
355#[derive(Debug)]
356pub struct Request<H: Handler + Debug + Send + 'static>(
357    Easy2<H>,
358    oneshot::Sender<Result<Easy2<H>, Error<H>>>,
359);