curl_http_client/collector.rs
1use std::fmt::Debug;
2use std::io::Read;
3use std::sync::{Arc, Mutex};
4use std::time::Instant;
5use std::{
6 fs::{File, OpenOptions},
7 io::{Seek, SeekFrom, Write},
8 path::PathBuf,
9};
10
11use curl::easy::{Handler, ReadError, WriteError};
12use derive_deref_rs::Deref;
13use http::{HeaderMap, HeaderName, HeaderValue};
14use log::trace;
15use tokio::sync::mpsc::{unbounded_channel, Sender, UnboundedReceiver, UnboundedSender};
16
17/// This is an information about the transfer(Download/Upload) speed that will be sent across tasks.
18/// It is useful to get the transfer speed and displayed it according to
19/// user's application.
20#[derive(Clone, Debug)]
21pub struct TransferSpeed(f64);
22
23impl TransferSpeed {
24 pub fn as_bytes_per_sec(&self) -> u64 {
25 self.0 as u64
26 }
27}
28
29impl From<u64> for TransferSpeed {
30 fn from(value: u64) -> Self {
31 Self(value as f64)
32 }
33}
34
35impl From<usize> for TransferSpeed {
36 fn from(value: usize) -> Self {
37 Self(value as f64)
38 }
39}
40
41impl From<i32> for TransferSpeed {
42 fn from(value: i32) -> Self {
43 Self(value as f64)
44 }
45}
46
47impl From<i64> for TransferSpeed {
48 fn from(value: i64) -> Self {
49 Self(value as f64)
50 }
51}
52
53impl From<f64> for TransferSpeed {
54 fn from(value: f64) -> Self {
55 Self(value)
56 }
57}
58
59/// AbortPerform is a flag that can be safely shared across threads to be able to cancel Curl perform operation
60/// via progress function of the Collector.
61#[derive(Deref, Clone, Debug)]
62pub struct AbortPerform {
63 abort: Arc<Mutex<bool>>,
64}
65
66impl AbortPerform {
67 /// Creates a new AbortPerform object with false as the default value.
68 pub fn new() -> Self {
69 Self {
70 abort: Arc::new(Mutex::new(false)),
71 }
72 }
73}
74
75impl Default for AbortPerform {
76 fn default() -> Self {
77 Self::new()
78 }
79}
80
81/// Stores the path for the downloaded file or the uploaded file.
82/// Internally it will also monitor the bytes transferred and the Download/Upload speed.
83#[derive(Clone, Debug)]
84pub struct FileInfo {
85 /// File path to download or file path of the source file to be uploaded.
86 pub path: PathBuf,
87 /// Sends the transfer speed information via channel to another task.
88 /// This is an optional parameter depends on the user application.
89 send_speed_info: Option<Sender<TransferSpeed>>,
90 bytes_transferred: usize,
91 transfer_started: Instant,
92 transfer_speed: TransferSpeed,
93 abort: Option<AbortPerform>,
94}
95
96impl FileInfo {
97 /// Sets the destination file path to download or file path of the source file to be uploaded.
98 pub fn path(path: PathBuf) -> Self {
99 Self {
100 path,
101 send_speed_info: None,
102 bytes_transferred: 0,
103 transfer_started: Instant::now(),
104 transfer_speed: TransferSpeed::from(0),
105 abort: None,
106 }
107 }
108
109 /// Sets the FileInfo struct with a message passing channel to send transfer speed information across user applications.
110 /// It uses a tokio bounded channel to send the information across tasks.
111 pub fn with_transfer_speed_sender(mut self, send_speed_info: Sender<TransferSpeed>) -> Self {
112 self.send_speed_info = Some(send_speed_info);
113 self
114 }
115
116 /// Set the FileInfo struct with a perform aborter.
117 /// AbortPerform is a shared flag across threads to be able to switch this flag to true to abort the curl perform.
118 pub fn with_perform_aborter(mut self, abort: AbortPerform) -> Self {
119 self.abort = Some(abort);
120 self
121 }
122
123 fn update_bytes_transferred(&mut self, transferred: usize) {
124 self.bytes_transferred += transferred;
125
126 let now = Instant::now();
127 let difference = now.duration_since(self.transfer_started);
128
129 self.transfer_speed =
130 TransferSpeed::from((self.bytes_transferred) as f64 / difference.as_secs_f64());
131 }
132
133 fn bytes_transferred(&self) -> usize {
134 self.bytes_transferred
135 }
136
137 fn transfer_speed(&self) -> TransferSpeed {
138 self.transfer_speed.clone()
139 }
140}
141
142fn send_transfer_info(info: &FileInfo) {
143 if let Some(tx) = info.send_speed_info.clone() {
144 let transfer_speed = info.transfer_speed();
145 tokio::spawn(async move {
146 tx.send(transfer_speed).await.map_err(|e| {
147 trace!("{:?}", e);
148 })
149 });
150 }
151}
152
153/// `StreamHandler` is a lightweight helper for managing streamed responses.
154///
155/// It provides two main components:
156/// - `chunk_sender`: An asynchronous channel sender used to forward each
157/// received chunk of data to a consumer in real time.
158/// - `abort`: An optional abort flag that can be used to stop an ongoing
159/// curl perform operation from another thread.
160///
161/// This struct is typically used in combination with the `Collector::Streaming`
162/// variant to enable progressive consumption of large or continuous responses
163/// without buffering everything in memory.
164#[derive(Clone, Debug)]
165pub struct StreamHandler {
166 /// Asynchronous sender that delivers streamed response chunks (`Vec<u8>`)
167 /// to a receiving end. Each received chunk is sent immediately, enabling
168 /// consumers to process data progressively.
169 chunk_sender: UnboundedSender<Vec<u8>>,
170
171 /// Optional abort handle (`AbortPerform`) that can be set to signal
172 /// cancellation of the ongoing curl perform operation.
173 ///
174 /// When triggered, this flag instructs the underlying transfer to stop early.
175 abort: Option<AbortPerform>,
176}
177
178impl StreamHandler {
179 /// Creates a new `StreamHandler` along with its corresponding receiver.
180 ///
181 /// This convenience method sets up a channel for streaming data,
182 /// returning both the `StreamHandler` (containing the sender) and
183 /// the `UnboundedReceiver` for consuming chunks.
184 ///
185 /// # Returns
186 /// A tuple of:
187 /// - `StreamHandler`: The handler containing the sender and optional abort flag.
188 /// - `UnboundedReceiver<Vec<u8>>`: The receiving end for streamed chunks.
189 pub fn new() -> (Self, UnboundedReceiver<Vec<u8>>) {
190 let (tx, rx) = unbounded_channel();
191 (
192 Self {
193 chunk_sender: tx,
194 abort: None,
195 },
196 rx,
197 )
198 }
199
200 /// Associates an abort handle with this `StreamHandler`.
201 ///
202 /// `AbortPerform` is a shared flag across threads that can be toggled
203 /// to `true` in order to abort the curl perform operation prematurely.
204 ///
205 /// # Example
206 /// ```
207 /// use curl_http_client::AbortPerform;
208 /// use curl_http_client::StreamHandler;
209 ///
210 /// let (handler, rx) = StreamHandler::new();
211 /// let aborter = AbortPerform::new();
212 /// let handler = handler.with_perform_aborter(aborter);
213 /// ```
214 pub fn with_perform_aborter(mut self, abort: AbortPerform) -> Self {
215 self.abort = Some(abort);
216 self
217 }
218}
219
220fn send_stream_data(stream: &StreamHandler, data: Vec<u8>) {
221 let tx = stream.chunk_sender.clone();
222
223 let _ = tx.send(data).map_err(|e| {
224 trace!("{:?}", e);
225 });
226}
227
228/// This is an extended trait for the curl::easy::Handler trait.
229pub trait ExtendedHandler: Handler {
230 // Return the response body if the Collector is available.
231 fn get_response_body(&self) -> Option<Vec<u8>> {
232 None
233 }
234 // Return the response body if the Collector is available with complete headers.
235 fn get_response_body_and_headers(&self) -> (Option<Vec<u8>>, Option<HeaderMap>) {
236 (None, None)
237 }
238}
239
240/// Collector::File(FileInfo) is used to be able to download and upload files.
241/// Collector::Ram(`Vec<u8>`) is used to store response body into Memory.
242/// Collector::RamWithHeaders(`Vec<u8>`, `Vec<u8>`) is used to store response body into Memory and with complete headers.
243/// Collector::FileAndHeaders(`FileInfo`, `Vec<u8>`) is used to be able to download and upload files and with complete headers.
244/// Collector::Streaming(`StreamHandler`, `Vec<u8>`) is used to process the response body as a **stream of chunks** instead of buffering the entire
245/// response in memory or writing it directly to a file.
246#[derive(Clone, Debug)]
247pub enum Collector {
248 /// Collector::File(`FileInfo`) is used to be able to download and upload files.
249 File(FileInfo),
250 /// Collector::Ram(`Vec<u8>`) is used to store response body into Memory.
251 Ram(Vec<u8>),
252 /// Collector::RamWithHeaders(`Vec<u8>`, `Vec<u8>`) is used to store response body into Memory and with complete headers.
253 RamAndHeaders(Vec<u8>, Vec<u8>),
254 /// Collector::FileAndHeaders(`FileInfo`, `Vec<u8>`) is used to be able to download and upload files and with complete headers.
255 FileAndHeaders(FileInfo, Vec<u8>),
256 /// Collector::Streaming(`StreamHandler`, `Vec<u8>`) is used to process
257 /// the response body as a **stream of chunks** instead of buffering the entire
258 /// response in memory or writing it directly to a file with complete headers.
259 /// This is useful for large responses, continuous data feeds, or situations
260 /// where you don’t want to block until the full body is received.
261 Streaming(StreamHandler, Vec<u8>),
262}
263
264impl Handler for Collector {
265 /// This will store the response from the server
266 /// to the data vector or into a file depends on the
267 /// Collector being used.
268 fn write(&mut self, data: &[u8]) -> Result<usize, WriteError> {
269 match self {
270 Collector::File(info) => {
271 let mut file = OpenOptions::new()
272 .create(true)
273 .append(true)
274 .open(info.path.clone())
275 .map_err(|e| {
276 trace!("{}", e);
277 WriteError::Pause
278 })?;
279
280 file.write_all(data).map_err(|e| {
281 trace!("{}", e);
282 WriteError::Pause
283 })?;
284
285 info.update_bytes_transferred(data.len());
286
287 send_transfer_info(info);
288 Ok(data.len())
289 }
290 Collector::Ram(container) => {
291 container.extend_from_slice(data);
292 Ok(data.len())
293 }
294 Collector::RamAndHeaders(container, _) => {
295 container.extend_from_slice(data);
296 Ok(data.len())
297 }
298 Collector::FileAndHeaders(info, _) => {
299 let mut file = OpenOptions::new()
300 .create(true)
301 .append(true)
302 .open(info.path.clone())
303 .map_err(|e| {
304 trace!("{}", e);
305 WriteError::Pause
306 })?;
307
308 file.write_all(data).map_err(|e| {
309 trace!("{}", e);
310 WriteError::Pause
311 })?;
312
313 info.update_bytes_transferred(data.len());
314
315 send_transfer_info(info);
316 Ok(data.len())
317 }
318 Collector::Streaming(stream, _) => {
319 send_stream_data(stream, data.to_vec());
320 Ok(data.len())
321 }
322 }
323 }
324 /// This will read the chunks of data from a file that will be uploaded
325 /// to the server. This will be use if the Collector is Collector::File(FileInfo).
326 fn read(&mut self, data: &mut [u8]) -> Result<usize, ReadError> {
327 match self {
328 Collector::File(info) => {
329 let mut file = File::open(info.path.clone()).map_err(|e| {
330 trace!("{}", e);
331 ReadError::Abort
332 })?;
333
334 file.seek(SeekFrom::Start(info.bytes_transferred() as u64))
335 .map_err(|e| {
336 trace!("{}", e);
337 ReadError::Abort
338 })?;
339
340 let read_size = file.read(data).map_err(|e| {
341 trace!("{}", e);
342 ReadError::Abort
343 })?;
344
345 info.update_bytes_transferred(read_size);
346
347 send_transfer_info(info);
348 Ok(read_size)
349 }
350 Collector::Ram(_) => Ok(0),
351 Collector::RamAndHeaders(_, _) => Ok(0),
352 Collector::FileAndHeaders(info, _) => {
353 let mut file = File::open(info.path.clone()).map_err(|e| {
354 trace!("{}", e);
355 ReadError::Abort
356 })?;
357
358 file.seek(SeekFrom::Start(info.bytes_transferred() as u64))
359 .map_err(|e| {
360 trace!("{}", e);
361 ReadError::Abort
362 })?;
363
364 let read_size = file.read(data).map_err(|e| {
365 trace!("{}", e);
366 ReadError::Abort
367 })?;
368
369 info.update_bytes_transferred(read_size);
370
371 send_transfer_info(info);
372 Ok(read_size)
373 }
374 Collector::Streaming(_, _) => Ok(0),
375 }
376 }
377
378 fn header(&mut self, data: &[u8]) -> bool {
379 match self {
380 Collector::File(_) => {}
381 Collector::Ram(_) => {}
382 Collector::RamAndHeaders(_, headers) => {
383 headers.extend_from_slice(data);
384 }
385 Collector::FileAndHeaders(_, headers) => {
386 headers.extend_from_slice(data);
387 }
388 Collector::Streaming(_, headers) => {
389 headers.extend_from_slice(data);
390 }
391 }
392 true
393 }
394
395 fn progress(&mut self, dltotal: f64, dlnow: f64, ultotal: f64, ulnow: f64) -> bool {
396 trace!("dltotal: {dltotal} dlnow: {dlnow} ultotal: {ultotal} ulnow: {ulnow}");
397 match self {
398 Collector::File(file_info) | Collector::FileAndHeaders(file_info, _) => {
399 if let Some(abort) = &file_info.abort {
400 abort.lock().map(|a| !*a).unwrap_or_else(|err| {
401 log::error!("{:?}", err);
402 false
403 })
404 } else {
405 true
406 }
407 }
408 Collector::Ram(_) | Collector::RamAndHeaders(_, _) => true,
409 Collector::Streaming(stream, _) => {
410 if let Some(abort) = &stream.abort {
411 abort.lock().map(|a| !*a).unwrap_or_else(|err| {
412 log::error!("{:?}", err);
413 false
414 })
415 } else {
416 true
417 }
418 }
419 }
420 }
421}
422
423impl ExtendedHandler for Collector {
424 /// If Collector::File(FileInfo) is set, there will be no response body since the response
425 /// will be stored into a file.
426 ///
427 /// If Collector::Ram(`Vec<u8>`) is set, the response body can be obtain here.
428 fn get_response_body(&self) -> Option<Vec<u8>> {
429 match self {
430 Collector::File(_) => None,
431 Collector::Ram(container) => {
432 if container.is_empty() {
433 None
434 } else {
435 Some(container.clone())
436 }
437 }
438 Collector::RamAndHeaders(container, _) => {
439 if container.is_empty() {
440 None
441 } else {
442 Some(container.clone())
443 }
444 }
445 Collector::FileAndHeaders(_, _) => None,
446 Collector::Streaming(_, _) => None,
447 }
448 }
449
450 /// If Collector::File(`FileInfo`) is set, there will be no response body since the response will be stored into a file.
451 /// If Collector::Ram(`Vec<u8>`) is set, the response body can be obtain here.
452 /// If Collector::RamAndHeaders(`Vec<u8>`, `Vec<u8>`) is set, the response body and the complete headers are generated.
453 /// If Collector::FileAndHeaders(`FileInfo`, `Vec<u8>`) is set, there will be no response body since the response will be stored into a file but a complete headers are generated.
454 fn get_response_body_and_headers(&self) -> (Option<Vec<u8>>, Option<HeaderMap>) {
455 match self {
456 Collector::File(_) => (None, None),
457 Collector::Ram(container) => {
458 if container.is_empty() {
459 (None, None)
460 } else {
461 (Some(container.clone()), None)
462 }
463 }
464 Collector::RamAndHeaders(container, headers) => {
465 let header_str = std::str::from_utf8(headers).unwrap();
466 let mut header_map = HeaderMap::new();
467
468 for line in header_str.lines() {
469 // Split each line into key-value pairs
470 if let Some((key, value)) = line.split_once(": ").to_owned() {
471 if let Ok(header_name) = HeaderName::from_bytes(key.as_bytes()) {
472 if let Ok(header_value) = HeaderValue::from_str(value) {
473 // Insert the key-value pair into the HeaderMap
474 header_map.insert(header_name, header_value);
475 }
476 }
477 }
478 }
479 if container.is_empty() {
480 (None, Some(header_map))
481 } else {
482 (Some(container.clone()), Some(header_map))
483 }
484 }
485 Collector::FileAndHeaders(_, headers) => {
486 let header_str = std::str::from_utf8(headers).unwrap();
487 let mut header_map = HeaderMap::new();
488
489 for line in header_str.lines() {
490 // Split each line into key-value pairs
491 if let Some((key, value)) = line.split_once(": ").to_owned() {
492 if let Ok(header_name) = HeaderName::from_bytes(key.as_bytes()) {
493 if let Ok(header_value) = HeaderValue::from_str(value) {
494 // Insert the key-value pair into the HeaderMap
495 header_map.insert(header_name, header_value);
496 }
497 }
498 }
499 }
500 (None, Some(header_map))
501 }
502 Collector::Streaming(_, headers) => {
503 let header_str = std::str::from_utf8(headers).unwrap();
504 let mut header_map = HeaderMap::new();
505
506 for line in header_str.lines() {
507 // Split each line into key-value pairs
508 if let Some((key, value)) = line.split_once(": ").to_owned() {
509 if let Ok(header_name) = HeaderName::from_bytes(key.as_bytes()) {
510 if let Ok(header_value) = HeaderValue::from_str(value) {
511 // Insert the key-value pair into the HeaderMap
512 header_map.insert(header_name, header_value);
513 }
514 }
515 }
516 }
517 (None, Some(header_map))
518 }
519 }
520 }
521}