async_curl/actor.rs
1use std::fmt::Debug;
2use std::time::Duration;
3
4use async_trait::async_trait;
5use curl::easy::{Easy2, Handler};
6use curl::multi::Multi;
7use log::trace;
8use tokio::runtime::{Builder, Runtime};
9use tokio::sync::mpsc::{self, Receiver, Sender};
10use tokio::sync::oneshot;
11use tokio::task::LocalSet;
12
13use crate::error::Error;
14
15#[async_trait]
16pub trait Actor<H>
17where
18 H: Handler + Debug + Send + 'static,
19{
20 async fn send_request(&self, easy2: Easy2<H>) -> Result<Easy2<H>, Error<H>>;
21}
22
23/// CurlActor is responsible for performing
24/// the contructed Easy2 object at the background
25/// to perform it asynchronously.
26/// ```
27/// use async_curl::actor::{Actor, CurlActor};
28/// use curl::easy::{Easy2, Handler, WriteError};
29///
30/// #[derive(Debug, Clone, Default)]
31/// pub struct ResponseHandler {
32/// data: Vec<u8>,
33/// }
34///
35/// impl Handler for ResponseHandler {
36/// /// This will store the response from the server
37/// /// to the data vector.
38/// fn write(&mut self, data: &[u8]) -> Result<usize, WriteError> {
39/// self.data.extend_from_slice(data);
40/// Ok(data.len())
41/// }
42/// }
43///
44/// impl ResponseHandler {
45/// /// Instantiation of the ResponseHandler
46/// /// and initialize the data vector.
47/// pub fn new() -> Self {
48/// Self::default()
49/// }
50///
51/// /// This will consumed the object and
52/// /// give the data to the caller
53/// pub fn get_data(self) -> Vec<u8> {
54/// self.data
55/// }
56/// }
57///
58/// # #[tokio::main(flavor = "current_thread")]
59/// # async fn main() -> Result<(), Box<dyn std::error::Error>>{
60/// let curl = CurlActor::new();
61/// let mut easy2 = Easy2::new(ResponseHandler::new());
62///
63/// easy2.url("https://www.rust-lang.org").unwrap();
64/// easy2.get(true).unwrap();
65///
66/// let response = curl.send_request(easy2).await.unwrap();
67/// eprintln!("{:?}", response.get_ref());
68///
69/// Ok(())
70/// # }
71/// ```
72///
73/// Example for multiple request executed
74/// at the same time.
75///
76/// ```
77/// use async_curl::actor::{Actor, CurlActor};
78/// use curl::easy::{Easy2, Handler, WriteError};
79///
80/// #[derive(Debug, Clone, Default)]
81/// pub struct ResponseHandler {
82/// data: Vec<u8>,
83/// }
84///
85/// impl Handler for ResponseHandler {
86/// /// This will store the response from the server
87/// /// to the data vector.
88/// fn write(&mut self, data: &[u8]) -> Result<usize, WriteError> {
89/// self.data.extend_from_slice(data);
90/// Ok(data.len())
91/// }
92/// }
93///
94/// impl ResponseHandler {
95/// /// Instantiation of the ResponseHandler
96/// /// and initialize the data vector.
97/// pub fn new() -> Self {
98/// Self::default()
99/// }
100///
101/// /// This will consumed the object and
102/// /// give the data to the caller
103/// pub fn get_data(self) -> Vec<u8> {
104/// self.data
105/// }
106/// }
107///
108/// # #[tokio::main(flavor = "current_thread")]
109/// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
110/// let actor = CurlActor::new();
111/// let mut easy2 = Easy2::new(ResponseHandler::new());
112/// easy2.url("https://www.rust-lang.org").unwrap();
113/// easy2.get(true).unwrap();
114///
115/// let actor1 = actor.clone();
116/// let spawn1 = tokio::spawn(async move {
117/// let response = actor1.send_request(easy2).await;
118/// let mut response = response.unwrap();
119///
120/// // Response body
121/// eprintln!(
122/// "Task 1 : {}",
123/// String::from_utf8_lossy(&response.get_ref().to_owned().get_data())
124/// );
125/// // Response status code
126/// let status_code = response.response_code().unwrap();
127/// eprintln!("Task 1 : {}", status_code);
128/// });
129///
130/// let mut easy2 = Easy2::new(ResponseHandler::new());
131/// easy2.url("https://www.rust-lang.org").unwrap();
132/// easy2.get(true).unwrap();
133///
134/// let spawn2 = tokio::spawn(async move {
135/// let response = actor.send_request(easy2).await;
136/// let mut response = response.unwrap();
137///
138/// // Response body
139/// eprintln!(
140/// "Task 2 : {}",
141/// String::from_utf8_lossy(&response.get_ref().to_owned().get_data())
142/// );
143/// // Response status code
144/// let status_code = response.response_code().unwrap();
145/// eprintln!("Task 2 : {}", status_code);
146/// });
147/// let (_, _) = tokio::join!(spawn1, spawn2);
148///
149/// Ok(())
150/// # }
151/// ```
152///
153use std::sync::Arc;
154use std::thread::JoinHandle;
155
156/// This contains the Easy2 object and a oneshot sender channel when passing into the
157/// background task to perform Curl asynchronously.
158#[derive(Debug)]
159pub struct Request<H: Handler + Debug + Send + 'static>(
160 Easy2<H>,
161 oneshot::Sender<Result<Easy2<H>, Error<H>>>,
162 TransferType,
163);
164
165/// This enum is used to differentiate between the two types of transfers: Multi and Easy2.
166#[derive(Debug, Clone)]
167enum TransferType {
168 Multi,
169 Easy2,
170}
171
172struct Inner<H>
173where
174 H: Handler + Debug + Send + 'static,
175{
176 request_sender: Option<Sender<Request<H>>>,
177 join_handle: Option<JoinHandle<()>>,
178}
179
180impl<H> Drop for Inner<H>
181where
182 H: Handler + Debug + Send + 'static,
183{
184 fn drop(&mut self) {
185 // Take and drop the sender so the background actor sees channel closed.
186 if let Some(sender) = self.request_sender.take() {
187 trace!("Dropping request sender to signal background actor to shut down.");
188 drop(sender);
189 trace!("Request sender dropped, signaling background actor to shut down.");
190 }
191 // Join the background thread to ensure graceful shutdown.
192 if let Some(handle) = self.join_handle.take() {
193 trace!("Attempting to join background actor thread for graceful shutdown...");
194 let _ = handle.join();
195 trace!("Background actor thread joined successfully.");
196 }
197 }
198}
199
200#[derive(Clone)]
201pub struct CurlActor<H>
202where
203 H: Handler + Debug + Send + 'static,
204{
205 inner: Arc<Inner<H>>,
206 transfer_type: TransferType,
207}
208
209impl<H> Default for CurlActor<H>
210where
211 H: Handler + Debug + Send + 'static,
212{
213 fn default() -> Self {
214 Self::new()
215 }
216}
217
218#[async_trait]
219impl<H> Actor<H> for CurlActor<H>
220where
221 H: Handler + Debug + Send + 'static,
222{
223 /// This will send Easy2 into the background task that will perform
224 /// curl asynchronously, await the response in the oneshot receiver and
225 /// return Easy2 back to the caller.
226 async fn send_request(&self, easy2: Easy2<H>) -> Result<Easy2<H>, Error<H>> {
227 let (oneshot_sender, oneshot_receiver) = oneshot::channel::<Result<Easy2<H>, Error<H>>>();
228 self.inner
229 .request_sender
230 .as_ref()
231 .expect("request_sender missing")
232 .send(Request(easy2, oneshot_sender, self.transfer_type.clone()))
233 .await?;
234 oneshot_receiver.await?
235 }
236}
237
238impl<H> CurlActor<H>
239where
240 H: Handler + Debug + Send + 'static,
241{
242 /// This creates the new instance of CurlActor to handle Curl perform asynchronously using Curl Multi
243 /// in a background thread to avoid blocking of other tasks.
244 pub fn new() -> Self {
245 let runtime = Builder::new_current_thread().enable_all().build().unwrap();
246 let (request_sender, request_receiver) = mpsc::channel::<Request<H>>(1);
247
248 let handle = Self::spawn_actor(runtime, request_receiver);
249
250 Self {
251 inner: Arc::new(Inner {
252 request_sender: Some(request_sender),
253 join_handle: Some(handle),
254 }),
255 transfer_type: TransferType::Easy2,
256 }
257 }
258
259 /// This creates the new instance of CurlActor to handle Curl perform asynchronously using Curl Multi
260 /// in a background thread to avoid blocking of other tasks. The user can provide a custom runtime
261 /// to use for the background task.
262 pub fn new_runtime(runtime: Runtime) -> Self {
263 let (request_sender, request_receiver) = mpsc::channel::<Request<H>>(1);
264
265 let handle = Self::spawn_actor(runtime, request_receiver);
266
267 Self {
268 inner: Arc::new(Inner {
269 request_sender: Some(request_sender),
270 join_handle: Some(handle),
271 }),
272 transfer_type: TransferType::Easy2,
273 }
274 }
275
276 /// Create a new CurlActor with a user-provided runtime and configurable channel capacity.
277 pub fn new_runtime_with_capacity(runtime: Runtime, capacity: usize) -> Self {
278 let (request_sender, request_receiver) = mpsc::channel::<Request<H>>(capacity);
279
280 let handle = Self::spawn_actor(runtime, request_receiver);
281
282 Self {
283 inner: Arc::new(Inner {
284 request_sender: Some(request_sender),
285 join_handle: Some(handle),
286 }),
287 transfer_type: TransferType::Easy2,
288 }
289 }
290
291 fn spawn_actor(runtime: Runtime, mut request_receiver: Receiver<Request<H>>) -> JoinHandle<()> {
292 std::thread::spawn(move || {
293 let local = LocalSet::new();
294 local.spawn_local(async move {
295 while let Some(Request(easy2, oneshot_sender, transfer_type)) =
296 request_receiver.recv().await
297 {
298 tokio::task::spawn_local(async move {
299 let response = match transfer_type {
300 TransferType::Easy2 => perform_curl_easy2(easy2).await,
301 TransferType::Multi => perform_curl_multi(easy2).await,
302 };
303 if let Err(res) = oneshot_sender.send(response) {
304 trace!("Warning! The receiver has been dropped. {:?}", res);
305 }
306 });
307 }
308 });
309 runtime.block_on(local);
310 })
311 }
312
313 /// This method allows the user to switch the transfer type to Curl Multi for the CurlActor.
314 pub fn use_multi_transfer(self) -> Self {
315 Self {
316 inner: self.inner,
317 transfer_type: TransferType::Multi,
318 }
319 }
320
321 /// This method allows the user to switch the transfer type to Curl Easy2 for the CurlActor.
322 pub fn use_easy2_transfer(self) -> Self {
323 Self {
324 inner: self.inner,
325 transfer_type: TransferType::Easy2,
326 }
327 }
328}
329
330async fn perform_curl_multi<H: Handler + Debug + Send + 'static>(
331 easy2: Easy2<H>,
332) -> Result<Easy2<H>, Error<H>> {
333 trace!("perform_curl_multi: starting curl multi operation");
334 tokio::task::spawn_blocking(move || -> Result<Easy2<H>, Error<H>> {
335 let multi = Multi::new();
336 let handle = multi.add2(easy2).map_err(|e| Error::Multi(e))?;
337
338 while multi.perform().map_err(|e| Error::Multi(e))? != 0 {
339 let timeout_result = multi
340 .get_timeout()
341 .map(|d| d.unwrap_or_else(|| Duration::from_secs(2)));
342
343 let timeout = match timeout_result {
344 Ok(duration) => duration,
345 Err(multi_error) => {
346 if !multi_error.is_call_perform() {
347 return Err(Error::Multi(multi_error));
348 }
349 Duration::ZERO
350 }
351 };
352
353 if !timeout.is_zero() {
354 trace!(
355 "perform_curl_multi: waiting for IO or timeout {:?}",
356 timeout
357 );
358 let ready = multi.wait(&mut [], timeout).map_err(Error::Multi)?;
359 trace!(
360 "perform_curl_multi: wait completed, {} sockets ready",
361 ready
362 );
363 }
364 }
365
366 // Inspect messages for transfer-level errors.
367 let mut transfer_error: Option<Error<H>> = None;
368 multi.messages(|msg| {
369 if let Some(Err(e)) = msg.result() {
370 transfer_error = Some(Error::Curl(e));
371 }
372 });
373
374 // Always attempt to remove the handle to clean up resources. If there was
375 // a transfer error prefer returning that error, but still try to perform
376 // the removal and log any cleanup failure.
377 let cleanup = multi.remove2(handle).map_err(|e| Error::Multi(e));
378
379 if let Some(e) = transfer_error {
380 if let Err(ref clean_err) = cleanup {
381 trace!(
382 "perform_curl_multi: remove2 failed during cleanup: {:?}",
383 clean_err
384 );
385 }
386 Err(e)
387 } else {
388 cleanup
389 }
390 })
391 .await
392 .map_err(Error::JoinError)?
393}
394
395async fn perform_curl_easy2<H: Handler + Debug + Send + 'static>(
396 easy2: Easy2<H>,
397) -> Result<Easy2<H>, Error<H>> {
398 trace!("perform_curl_easy2: starting curl easy2 operation");
399 tokio::task::spawn_blocking(move || -> Result<Easy2<H>, Error<H>> {
400 easy2.perform().map_err(|e| Error::Curl(e))?;
401 Ok(easy2)
402 })
403 .await
404 .map_err(Error::JoinError)?
405}