async_curl/actor.rs
1use std::fmt::Debug;
2use std::time::Duration;
3
4use async_trait::async_trait;
5use curl::easy::{Easy2, Handler};
6use curl::multi::Multi;
7use log::trace;
8use tokio::runtime::{Builder, Runtime};
9use tokio::sync::mpsc::{self, Receiver, Sender};
10use tokio::sync::oneshot;
11use tokio::task::LocalSet;
12use tokio::time::sleep;
13
14use crate::error::Error;
15
16#[async_trait]
17pub trait Actor<H>
18where
19 H: Handler + Debug + Send + 'static,
20{
21 async fn send_request(&self, easy2: Easy2<H>) -> Result<Easy2<H>, Error<H>>;
22}
23
24/// CurlActor is responsible for performing
25/// the contructed Easy2 object at the background
26/// to perform it asynchronously.
27/// ```
28/// use async_curl::actor::{Actor, CurlActor};
29/// use curl::easy::{Easy2, Handler, WriteError};
30///
31/// #[derive(Debug, Clone, Default)]
32/// pub struct ResponseHandler {
33/// data: Vec<u8>,
34/// }
35///
36/// impl Handler for ResponseHandler {
37/// /// This will store the response from the server
38/// /// to the data vector.
39/// fn write(&mut self, data: &[u8]) -> Result<usize, WriteError> {
40/// self.data.extend_from_slice(data);
41/// Ok(data.len())
42/// }
43/// }
44///
45/// impl ResponseHandler {
46/// /// Instantiation of the ResponseHandler
47/// /// and initialize the data vector.
48/// pub fn new() -> Self {
49/// Self::default()
50/// }
51///
52/// /// This will consumed the object and
53/// /// give the data to the caller
54/// pub fn get_data(self) -> Vec<u8> {
55/// self.data
56/// }
57/// }
58///
59/// # #[tokio::main(flavor = "current_thread")]
60/// # async fn main() -> Result<(), Box<dyn std::error::Error>>{
61/// let curl = CurlActor::new();
62/// let mut easy2 = Easy2::new(ResponseHandler::new());
63///
64/// easy2.url("https://www.rust-lang.org").unwrap();
65/// easy2.get(true).unwrap();
66///
67/// let response = curl.send_request(easy2).await.unwrap();
68/// eprintln!("{:?}", response.get_ref());
69///
70/// Ok(())
71/// # }
72/// ```
73///
74/// Example for multiple request executed
75/// at the same time.
76///
77/// ```
78/// use async_curl::actor::{Actor, CurlActor};
79/// use curl::easy::{Easy2, Handler, WriteError};
80///
81/// #[derive(Debug, Clone, Default)]
82/// pub struct ResponseHandler {
83/// data: Vec<u8>,
84/// }
85///
86/// impl Handler for ResponseHandler {
87/// /// This will store the response from the server
88/// /// to the data vector.
89/// fn write(&mut self, data: &[u8]) -> Result<usize, WriteError> {
90/// self.data.extend_from_slice(data);
91/// Ok(data.len())
92/// }
93/// }
94///
95/// impl ResponseHandler {
96/// /// Instantiation of the ResponseHandler
97/// /// and initialize the data vector.
98/// pub fn new() -> Self {
99/// Self::default()
100/// }
101///
102/// /// This will consumed the object and
103/// /// give the data to the caller
104/// pub fn get_data(self) -> Vec<u8> {
105/// self.data
106/// }
107/// }
108///
109/// # #[tokio::main(flavor = "current_thread")]
110/// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
111/// let actor = CurlActor::new();
112/// let mut easy2 = Easy2::new(ResponseHandler::new());
113/// easy2.url("https://www.rust-lang.org").unwrap();
114/// easy2.get(true).unwrap();
115///
116/// let actor1 = actor.clone();
117/// let spawn1 = tokio::spawn(async move {
118/// let response = actor1.send_request(easy2).await;
119/// let mut response = response.unwrap();
120///
121/// // Response body
122/// eprintln!(
123/// "Task 1 : {}",
124/// String::from_utf8_lossy(&response.get_ref().to_owned().get_data())
125/// );
126/// // Response status code
127/// let status_code = response.response_code().unwrap();
128/// eprintln!("Task 1 : {}", status_code);
129/// });
130///
131/// let mut easy2 = Easy2::new(ResponseHandler::new());
132/// easy2.url("https://www.rust-lang.org").unwrap();
133/// easy2.get(true).unwrap();
134///
135/// let spawn2 = tokio::spawn(async move {
136/// let response = actor.send_request(easy2).await;
137/// let mut response = response.unwrap();
138///
139/// // Response body
140/// eprintln!(
141/// "Task 2 : {}",
142/// String::from_utf8_lossy(&response.get_ref().to_owned().get_data())
143/// );
144/// // Response status code
145/// let status_code = response.response_code().unwrap();
146/// eprintln!("Task 2 : {}", status_code);
147/// });
148/// let (_, _) = tokio::join!(spawn1, spawn2);
149///
150/// Ok(())
151/// # }
152/// ```
153///
154#[derive(Clone)]
155pub struct CurlActor<H>
156where
157 H: Handler + Debug + Send + 'static,
158{
159 request_sender: Sender<Request<H>>,
160}
161
162impl<H> Default for CurlActor<H>
163where
164 H: Handler + Debug + Send + 'static,
165{
166 fn default() -> Self {
167 Self::new()
168 }
169}
170
171#[async_trait]
172impl<H> Actor<H> for CurlActor<H>
173where
174 H: Handler + Debug + Send + 'static,
175{
176 /// This will send Easy2 into the background task that will perform
177 /// curl asynchronously, await the response in the oneshot receiver and
178 /// return Easy2 back to the caller.
179 async fn send_request(&self, easy2: Easy2<H>) -> Result<Easy2<H>, Error<H>> {
180 let (oneshot_sender, oneshot_receiver) = oneshot::channel::<Result<Easy2<H>, Error<H>>>();
181 self.request_sender
182 .send(Request(easy2, oneshot_sender))
183 .await?;
184 oneshot_receiver.await?
185 }
186}
187
188impl<H> CurlActor<H>
189where
190 H: Handler + Debug + Send + 'static,
191{
192 /// This creates the new instance of CurlActor to handle Curl perform asynchronously using Curl Multi
193 /// in a background thread to avoid blocking of other tasks.
194 pub fn new() -> Self {
195 let runtime = Builder::new_current_thread().enable_all().build().unwrap();
196 let (request_sender, request_receiver) = mpsc::channel::<Request<H>>(1);
197
198 Self::spawn_actor(runtime, request_receiver);
199
200 Self { request_sender }
201 }
202
203 /// This creates the new instance of CurlActor to handle Curl perform asynchronously using Curl Multi
204 /// in a background thread to avoid blocking of other tasks. The user can provide a custom runtime
205 /// to use for the background task.
206 pub fn new_runtime(runtime: Runtime) -> Self {
207 let (request_sender, request_receiver) = mpsc::channel::<Request<H>>(1);
208
209 Self::spawn_actor(runtime, request_receiver);
210
211 Self { request_sender }
212 }
213
214 fn spawn_actor(runtime: Runtime, mut request_receiver: Receiver<Request<H>>) {
215 std::thread::spawn(move || {
216 let local = LocalSet::new();
217 local.spawn_local(async move {
218 while let Some(Request(easy2, oneshot_sender)) = request_receiver.recv().await {
219 tokio::task::spawn_local(async move {
220 let response = perform_curl_multi(easy2).await;
221 if let Err(res) = oneshot_sender.send(response) {
222 trace!("Warning! The receiver has been dropped. {:?}", res);
223 }
224 });
225 }
226 });
227 runtime.block_on(local);
228 });
229 }
230}
231
232async fn perform_curl_multi<H: Handler + Debug + Send + 'static>(
233 easy2: Easy2<H>,
234) -> Result<Easy2<H>, Error<H>> {
235 let multi = Multi::new();
236 let handle = multi.add2(easy2).map_err(|e| Error::Multi(e))?;
237
238 while multi.perform().map_err(|e| Error::Multi(e))? != 0 {
239 let timeout_result = multi
240 .get_timeout()
241 .map(|d| d.unwrap_or_else(|| Duration::from_secs(2)));
242
243 let timeout = match timeout_result {
244 Ok(duration) => duration,
245 Err(multi_error) => {
246 if !multi_error.is_call_perform() {
247 return Err(Error::Multi(multi_error));
248 }
249 Duration::ZERO
250 }
251 };
252
253 if !timeout.is_zero() {
254 sleep(Duration::from_millis(200)).await;
255 }
256 }
257
258 let mut error: Option<Error<H>> = None;
259 multi.messages(|msg| {
260 if let Some(Err(e)) = msg.result() {
261 error = Some(Error::Curl(e));
262 }
263 });
264
265 if let Some(e) = error {
266 Err(e)
267 } else {
268 multi.remove2(handle).map_err(|e| Error::Multi(e))
269 }
270}
271
272/// This contains the Easy2 object and a oneshot sender channel when passing into the
273/// background task to perform Curl asynchronously.
274#[derive(Debug)]
275pub struct Request<H: Handler + Debug + Send + 'static>(
276 Easy2<H>,
277 oneshot::Sender<Result<Easy2<H>, Error<H>>>,
278);