async_curl/actor.rs
1use std::fmt::Debug;
2use std::time::Duration;
3
4use async_trait::async_trait;
5use curl::easy::{Easy2, Handler};
6use curl::multi::Multi;
7use log::trace;
8use tokio::runtime::{Builder, Runtime};
9use tokio::sync::mpsc::{self, Receiver, Sender};
10use tokio::sync::oneshot;
11use tokio::task::LocalSet;
12
13use crate::error::Error;
14
15#[async_trait]
16pub trait Actor<H>
17where
18 H: Handler + Debug + Send + 'static,
19{
20 async fn send_request(&self, easy2: Easy2<H>) -> Result<Easy2<H>, Error<H>>;
21}
22
23/// CurlActor is responsible for performing
24/// the contructed Easy2 object at the background
25/// to perform it asynchronously.
26/// ```
27/// use async_curl::actor::{Actor, CurlActor};
28/// use curl::easy::{Easy2, Handler, WriteError};
29///
30/// #[derive(Debug, Clone, Default)]
31/// pub struct ResponseHandler {
32/// data: Vec<u8>,
33/// }
34///
35/// impl Handler for ResponseHandler {
36/// /// This will store the response from the server
37/// /// to the data vector.
38/// fn write(&mut self, data: &[u8]) -> Result<usize, WriteError> {
39/// self.data.extend_from_slice(data);
40/// Ok(data.len())
41/// }
42/// }
43///
44/// impl ResponseHandler {
45/// /// Instantiation of the ResponseHandler
46/// /// and initialize the data vector.
47/// pub fn new() -> Self {
48/// Self::default()
49/// }
50///
51/// /// This will consumed the object and
52/// /// give the data to the caller
53/// pub fn get_data(self) -> Vec<u8> {
54/// self.data
55/// }
56/// }
57///
58/// # #[tokio::main(flavor = "current_thread")]
59/// # async fn main() -> Result<(), Box<dyn std::error::Error>>{
60/// let curl = CurlActor::new();
61/// let mut easy2 = Easy2::new(ResponseHandler::new());
62///
63/// easy2.url("https://www.rust-lang.org").unwrap();
64/// easy2.get(true).unwrap();
65///
66/// let response = curl.send_request(easy2).await.unwrap();
67/// eprintln!("{:?}", response.get_ref());
68///
69/// Ok(())
70/// # }
71/// ```
72///
73/// Example for multiple request executed
74/// at the same time.
75///
76/// ```
77/// use async_curl::actor::{Actor, CurlActor};
78/// use curl::easy::{Easy2, Handler, WriteError};
79///
80/// #[derive(Debug, Clone, Default)]
81/// pub struct ResponseHandler {
82/// data: Vec<u8>,
83/// }
84///
85/// impl Handler for ResponseHandler {
86/// /// This will store the response from the server
87/// /// to the data vector.
88/// fn write(&mut self, data: &[u8]) -> Result<usize, WriteError> {
89/// self.data.extend_from_slice(data);
90/// Ok(data.len())
91/// }
92/// }
93///
94/// impl ResponseHandler {
95/// /// Instantiation of the ResponseHandler
96/// /// and initialize the data vector.
97/// pub fn new() -> Self {
98/// Self::default()
99/// }
100///
101/// /// This will consumed the object and
102/// /// give the data to the caller
103/// pub fn get_data(self) -> Vec<u8> {
104/// self.data
105/// }
106/// }
107///
108/// # #[tokio::main(flavor = "current_thread")]
109/// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
110/// let actor = CurlActor::new();
111/// let mut easy2 = Easy2::new(ResponseHandler::new());
112/// easy2.url("https://www.rust-lang.org").unwrap();
113/// easy2.get(true).unwrap();
114///
115/// let actor1 = actor.clone();
116/// let spawn1 = tokio::spawn(async move {
117/// let response = actor1.send_request(easy2).await;
118/// let mut response = response.unwrap();
119///
120/// // Response body
121/// eprintln!(
122/// "Task 1 : {}",
123/// String::from_utf8_lossy(&response.get_ref().to_owned().get_data())
124/// );
125/// // Response status code
126/// let status_code = response.response_code().unwrap();
127/// eprintln!("Task 1 : {}", status_code);
128/// });
129///
130/// let mut easy2 = Easy2::new(ResponseHandler::new());
131/// easy2.url("https://www.rust-lang.org").unwrap();
132/// easy2.get(true).unwrap();
133///
134/// let spawn2 = tokio::spawn(async move {
135/// let response = actor.send_request(easy2).await;
136/// let mut response = response.unwrap();
137///
138/// // Response body
139/// eprintln!(
140/// "Task 2 : {}",
141/// String::from_utf8_lossy(&response.get_ref().to_owned().get_data())
142/// );
143/// // Response status code
144/// let status_code = response.response_code().unwrap();
145/// eprintln!("Task 2 : {}", status_code);
146/// });
147/// let (_, _) = tokio::join!(spawn1, spawn2);
148///
149/// Ok(())
150/// # }
151/// ```
152///
153use std::sync::Arc;
154use std::thread::JoinHandle;
155
156struct Inner<H>
157where
158 H: Handler + Debug + Send + 'static,
159{
160 request_sender: Option<Sender<Request<H>>>,
161 join_handle: Option<JoinHandle<()>>,
162}
163
164impl<H> Drop for Inner<H>
165where
166 H: Handler + Debug + Send + 'static,
167{
168 fn drop(&mut self) {
169 // Take and drop the sender so the background actor sees channel closed.
170 if let Some(sender) = self.request_sender.take() {
171 trace!("Dropping request sender to signal background actor to shut down.");
172 drop(sender);
173 trace!("Request sender dropped, signaling background actor to shut down.");
174 }
175 // Join the background thread to ensure graceful shutdown.
176 if let Some(handle) = self.join_handle.take() {
177 trace!("Attempting to join background actor thread for graceful shutdown...");
178 let _ = handle.join();
179 trace!("Background actor thread joined successfully.");
180 }
181 }
182}
183
184#[derive(Clone)]
185pub struct CurlActor<H>
186where
187 H: Handler + Debug + Send + 'static,
188{
189 inner: Arc<Inner<H>>,
190}
191
192impl<H> Default for CurlActor<H>
193where
194 H: Handler + Debug + Send + 'static,
195{
196 fn default() -> Self {
197 Self::new()
198 }
199}
200
201#[async_trait]
202impl<H> Actor<H> for CurlActor<H>
203where
204 H: Handler + Debug + Send + 'static,
205{
206 /// This will send Easy2 into the background task that will perform
207 /// curl asynchronously, await the response in the oneshot receiver and
208 /// return Easy2 back to the caller.
209 async fn send_request(&self, easy2: Easy2<H>) -> Result<Easy2<H>, Error<H>> {
210 let (oneshot_sender, oneshot_receiver) = oneshot::channel::<Result<Easy2<H>, Error<H>>>();
211 self.inner
212 .request_sender
213 .as_ref()
214 .expect("request_sender missing")
215 .send(Request(easy2, oneshot_sender))
216 .await?;
217 oneshot_receiver.await?
218 }
219}
220
221impl<H> CurlActor<H>
222where
223 H: Handler + Debug + Send + 'static,
224{
225 /// This creates the new instance of CurlActor to handle Curl perform asynchronously using Curl Multi
226 /// in a background thread to avoid blocking of other tasks.
227 pub fn new() -> Self {
228 let runtime = Builder::new_current_thread().enable_all().build().unwrap();
229 let (request_sender, request_receiver) = mpsc::channel::<Request<H>>(1);
230
231 let handle = Self::spawn_actor(runtime, request_receiver);
232
233 Self {
234 inner: Arc::new(Inner {
235 request_sender: Some(request_sender),
236 join_handle: Some(handle),
237 }),
238 }
239 }
240
241 /// This creates the new instance of CurlActor to handle Curl perform asynchronously using Curl Multi
242 /// in a background thread to avoid blocking of other tasks. The user can provide a custom runtime
243 /// to use for the background task.
244 pub fn new_runtime(runtime: Runtime) -> Self {
245 let (request_sender, request_receiver) = mpsc::channel::<Request<H>>(1);
246
247 let handle = Self::spawn_actor(runtime, request_receiver);
248
249 Self {
250 inner: Arc::new(Inner {
251 request_sender: Some(request_sender),
252 join_handle: Some(handle),
253 }),
254 }
255 }
256
257 /// Create a new CurlActor with a user-provided runtime and configurable channel capacity.
258 pub fn new_runtime_with_capacity(runtime: Runtime, capacity: usize) -> Self {
259 let (request_sender, request_receiver) = mpsc::channel::<Request<H>>(capacity);
260
261 let handle = Self::spawn_actor(runtime, request_receiver);
262
263 Self {
264 inner: Arc::new(Inner {
265 request_sender: Some(request_sender),
266 join_handle: Some(handle),
267 }),
268 }
269 }
270
271 fn spawn_actor(runtime: Runtime, mut request_receiver: Receiver<Request<H>>) -> JoinHandle<()> {
272 std::thread::spawn(move || {
273 let local = LocalSet::new();
274 local.spawn_local(async move {
275 while let Some(Request(easy2, oneshot_sender)) = request_receiver.recv().await {
276 tokio::task::spawn_local(async move {
277 let response = perform_curl_multi(easy2).await;
278 if let Err(res) = oneshot_sender.send(response) {
279 trace!("Warning! The receiver has been dropped. {:?}", res);
280 }
281 });
282 }
283 });
284 runtime.block_on(local);
285 })
286 }
287}
288
289async fn perform_curl_multi<H: Handler + Debug + Send + 'static>(
290 easy2: Easy2<H>,
291) -> Result<Easy2<H>, Error<H>> {
292 tokio::task::spawn_blocking(move || -> Result<Easy2<H>, Error<H>> {
293 let multi = Multi::new();
294 let handle = multi.add2(easy2).map_err(|e| Error::Multi(e))?;
295
296 while multi.perform().map_err(|e| Error::Multi(e))? != 0 {
297 let timeout_result = multi
298 .get_timeout()
299 .map(|d| d.unwrap_or_else(|| Duration::from_secs(2)));
300
301 let timeout = match timeout_result {
302 Ok(duration) => duration,
303 Err(multi_error) => {
304 if !multi_error.is_call_perform() {
305 return Err(Error::Multi(multi_error));
306 }
307 Duration::ZERO
308 }
309 };
310
311 if !timeout.is_zero() {
312 trace!(
313 "perform_curl_multi: waiting for IO or timeout {:?}",
314 timeout
315 );
316 let ready = multi.wait(&mut [], timeout).map_err(Error::Multi)?;
317 trace!(
318 "perform_curl_multi: wait completed, {} sockets ready",
319 ready
320 );
321 }
322 }
323
324 // Inspect messages for transfer-level errors.
325 let mut transfer_error: Option<Error<H>> = None;
326 multi.messages(|msg| {
327 if let Some(Err(e)) = msg.result() {
328 transfer_error = Some(Error::Curl(e));
329 }
330 });
331
332 // Always attempt to remove the handle to clean up resources. If there was
333 // a transfer error prefer returning that error, but still try to perform
334 // the removal and log any cleanup failure.
335 let cleanup = multi.remove2(handle).map_err(|e| Error::Multi(e));
336
337 if let Some(e) = transfer_error {
338 if let Err(ref clean_err) = cleanup {
339 trace!(
340 "perform_curl_multi: remove2 failed during cleanup: {:?}",
341 clean_err
342 );
343 }
344 Err(e)
345 } else {
346 cleanup
347 }
348 })
349 .await
350 .map_err(Error::JoinError)?
351}
352
353/// This contains the Easy2 object and a oneshot sender channel when passing into the
354/// background task to perform Curl asynchronously.
355#[derive(Debug)]
356pub struct Request<H: Handler + Debug + Send + 'static>(
357 Easy2<H>,
358 oneshot::Sender<Result<Easy2<H>, Error<H>>>,
359);