curl_http_client/
collector.rs

1use std::fmt::Debug;
2use std::io::Read;
3use std::sync::{Arc, Mutex};
4use std::time::Instant;
5use std::{
6    fs::{File, OpenOptions},
7    io::{Seek, SeekFrom, Write},
8    path::PathBuf,
9};
10
11use curl::easy::{Handler, ReadError, WriteError};
12use derive_deref_rs::Deref;
13use http::{HeaderMap, HeaderName, HeaderValue};
14use log::trace;
15use tokio::sync::mpsc::{unbounded_channel, Sender, UnboundedReceiver, UnboundedSender};
16
17/// This is an information about the transfer(Download/Upload) speed that will be sent across tasks.
18/// It is useful to get the transfer speed and displayed it according to
19/// user's application.
20#[derive(Clone, Debug)]
21pub struct TransferSpeed(f64);
22
23impl TransferSpeed {
24    pub fn as_bytes_per_sec(&self) -> u64 {
25        self.0 as u64
26    }
27}
28
29impl From<u64> for TransferSpeed {
30    fn from(value: u64) -> Self {
31        Self(value as f64)
32    }
33}
34
35impl From<usize> for TransferSpeed {
36    fn from(value: usize) -> Self {
37        Self(value as f64)
38    }
39}
40
41impl From<i32> for TransferSpeed {
42    fn from(value: i32) -> Self {
43        Self(value as f64)
44    }
45}
46
47impl From<i64> for TransferSpeed {
48    fn from(value: i64) -> Self {
49        Self(value as f64)
50    }
51}
52
53impl From<f64> for TransferSpeed {
54    fn from(value: f64) -> Self {
55        Self(value)
56    }
57}
58
59/// AbortPerform is a flag that can be safely shared across threads to be able to cancel Curl perform operation
60/// via progress function of the Collector.
61#[derive(Deref, Clone, Debug)]
62pub struct AbortPerform {
63    abort: Arc<Mutex<bool>>,
64}
65
66impl AbortPerform {
67    /// Creates a new AbortPerform object with false as the default value.
68    pub fn new() -> Self {
69        Self {
70            abort: Arc::new(Mutex::new(false)),
71        }
72    }
73}
74
75impl Default for AbortPerform {
76    fn default() -> Self {
77        Self::new()
78    }
79}
80
81/// Stores the path for the downloaded file or the uploaded file.
82/// Internally it will also monitor the bytes transferred and the Download/Upload speed.
83#[derive(Clone, Debug)]
84pub struct FileInfo {
85    /// File path to download or file path of the source file to be uploaded.
86    pub path: PathBuf,
87    /// Sends the transfer speed information via channel to another task.
88    /// This is an optional parameter depends on the user application.
89    send_speed_info: Option<Sender<TransferSpeed>>,
90    bytes_transferred: usize,
91    transfer_started: Instant,
92    transfer_speed: TransferSpeed,
93    abort: Option<AbortPerform>,
94}
95
96impl FileInfo {
97    /// Sets the destination file path to download or file path of the source file to be uploaded.
98    pub fn path(path: PathBuf) -> Self {
99        Self {
100            path,
101            send_speed_info: None,
102            bytes_transferred: 0,
103            transfer_started: Instant::now(),
104            transfer_speed: TransferSpeed::from(0),
105            abort: None,
106        }
107    }
108
109    /// Sets the FileInfo struct with a message passing channel to send transfer speed information across user applications.
110    /// It uses a tokio bounded channel to send the information across tasks.
111    pub fn with_transfer_speed_sender(mut self, send_speed_info: Sender<TransferSpeed>) -> Self {
112        self.send_speed_info = Some(send_speed_info);
113        self
114    }
115
116    /// Set the FileInfo struct with a perform aborter.
117    /// AbortPerform is a shared flag across threads to be able to switch this flag to true to abort the curl perform.
118    pub fn with_perform_aborter(mut self, abort: AbortPerform) -> Self {
119        self.abort = Some(abort);
120        self
121    }
122
123    fn update_bytes_transferred(&mut self, transferred: usize) {
124        self.bytes_transferred += transferred;
125
126        let now = Instant::now();
127        let difference = now.duration_since(self.transfer_started);
128
129        self.transfer_speed =
130            TransferSpeed::from((self.bytes_transferred) as f64 / difference.as_secs_f64());
131    }
132
133    fn bytes_transferred(&self) -> usize {
134        self.bytes_transferred
135    }
136
137    fn transfer_speed(&self) -> TransferSpeed {
138        self.transfer_speed.clone()
139    }
140}
141
142fn send_transfer_info(info: &FileInfo) {
143    if let Some(tx) = info.send_speed_info.clone() {
144        let transfer_speed = info.transfer_speed();
145        tokio::spawn(async move {
146            tx.send(transfer_speed).await.map_err(|e| {
147                trace!("{:?}", e);
148            })
149        });
150    }
151}
152
153/// `StreamHandler` is a lightweight helper for managing streamed responses.
154///
155/// It provides two main components:
156/// - `chunk_sender`: An asynchronous channel sender used to forward each
157///   received chunk of data to a consumer in real time.
158/// - `abort`: An optional abort flag that can be used to stop an ongoing
159///   curl perform operation from another thread.
160///
161/// This struct is typically used in combination with the `Collector::Streaming`
162/// variant to enable progressive consumption of large or continuous responses
163/// without buffering everything in memory.
164#[derive(Clone, Debug)]
165pub struct StreamHandler {
166    /// Asynchronous sender that delivers streamed response chunks (`Vec<u8>`)
167    /// to a receiving end. Each received chunk is sent immediately, enabling
168    /// consumers to process data progressively.
169    chunk_sender: UnboundedSender<Vec<u8>>,
170
171    /// Optional abort handle (`AbortPerform`) that can be set to signal
172    /// cancellation of the ongoing curl perform operation.
173    ///
174    /// When triggered, this flag instructs the underlying transfer to stop early.
175    abort: Option<AbortPerform>,
176}
177
178impl StreamHandler {
179    /// Creates a new `StreamHandler` along with its corresponding receiver.
180    ///
181    /// This convenience method sets up a channel for streaming data,
182    /// returning both the `StreamHandler` (containing the sender) and
183    /// the `UnboundedReceiver` for consuming chunks.
184    ///
185    /// # Returns
186    /// A tuple of:
187    /// - `StreamHandler`: The handler containing the sender and optional abort flag.
188    /// - `UnboundedReceiver<Vec<u8>>`: The receiving end for streamed chunks.
189    pub fn new() -> (Self, UnboundedReceiver<Vec<u8>>) {
190        let (tx, rx) = unbounded_channel();
191        (
192            Self {
193                chunk_sender: tx,
194                abort: None,
195            },
196            rx,
197        )
198    }
199
200    /// Associates an abort handle with this `StreamHandler`.
201    ///
202    /// `AbortPerform` is a shared flag across threads that can be toggled
203    /// to `true` in order to abort the curl perform operation prematurely.
204    ///
205    /// # Example
206    /// ```
207    /// use curl_http_client::AbortPerform;
208    /// use curl_http_client::StreamHandler;
209    ///
210    /// let (handler, rx) = StreamHandler::new();
211    /// let aborter = AbortPerform::new();
212    /// let handler = handler.with_perform_aborter(aborter);
213    /// ```
214    pub fn with_perform_aborter(mut self, abort: AbortPerform) -> Self {
215        self.abort = Some(abort);
216        self
217    }
218}
219
220fn send_stream_data(stream: &StreamHandler, data: Vec<u8>) {
221    let tx = stream.chunk_sender.clone();
222
223    let _ = tx.send(data).map_err(|e| {
224        trace!("{:?}", e);
225    });
226}
227
228/// This is an extended trait for the curl::easy::Handler trait.
229pub trait ExtendedHandler: Handler {
230    // Return the response body if the Collector is available.
231    fn get_response_body(&self) -> Option<Vec<u8>> {
232        None
233    }
234    // Return the response body if the Collector is available with complete headers.
235    fn get_response_body_and_headers(&self) -> (Option<Vec<u8>>, Option<HeaderMap>) {
236        (None, None)
237    }
238}
239
240/// Collector::File(FileInfo) is used to be able to download and upload files.
241/// Collector::Ram(`Vec<u8>`) is used to store response body into Memory.
242/// Collector::RamWithHeaders(`Vec<u8>`, `Vec<u8>`) is used to store response body into Memory and with complete headers.
243/// Collector::FileAndHeaders(`FileInfo`, `Vec<u8>`) is used to be able to download and upload files and with complete headers.
244/// Collector::Streaming(`StreamHandler`, `Vec<u8>`) is used to process the response body as a **stream of chunks** instead of buffering the entire
245/// response in memory or writing it directly to a file.
246#[derive(Clone, Debug)]
247pub enum Collector {
248    /// Collector::File(`FileInfo`) is used to be able to download and upload files.
249    File(FileInfo),
250    /// Collector::Ram(`Vec<u8>`) is used to store response body into Memory.
251    Ram(Vec<u8>),
252    /// Collector::RamWithHeaders(`Vec<u8>`, `Vec<u8>`) is used to store response body into Memory and with complete headers.
253    RamAndHeaders(Vec<u8>, Vec<u8>),
254    /// Collector::FileAndHeaders(`FileInfo`, `Vec<u8>`) is used to be able to download and upload files and with complete headers.
255    FileAndHeaders(FileInfo, Vec<u8>),
256    /// Collector::Streaming(`StreamHandler`, `Vec<u8>`) is used to process
257    /// the response body as a **stream of chunks** instead of buffering the entire
258    /// response in memory or writing it directly to a file with complete headers.
259    /// This is useful for large responses, continuous data feeds, or situations
260    /// where you don’t want to block until the full body is received.
261    Streaming(StreamHandler, Vec<u8>),
262}
263
264impl Handler for Collector {
265    /// This will store the response from the server
266    /// to the data vector or into a file depends on the
267    /// Collector being used.
268    fn write(&mut self, data: &[u8]) -> Result<usize, WriteError> {
269        match self {
270            Collector::File(info) => {
271                let mut file = OpenOptions::new()
272                    .create(true)
273                    .append(true)
274                    .open(info.path.clone())
275                    .map_err(|e| {
276                        trace!("{}", e);
277                        WriteError::Pause
278                    })?;
279
280                file.write_all(data).map_err(|e| {
281                    trace!("{}", e);
282                    WriteError::Pause
283                })?;
284
285                info.update_bytes_transferred(data.len());
286
287                send_transfer_info(info);
288                Ok(data.len())
289            }
290            Collector::Ram(container) => {
291                container.extend_from_slice(data);
292                Ok(data.len())
293            }
294            Collector::RamAndHeaders(container, _) => {
295                container.extend_from_slice(data);
296                Ok(data.len())
297            }
298            Collector::FileAndHeaders(info, _) => {
299                let mut file = OpenOptions::new()
300                    .create(true)
301                    .append(true)
302                    .open(info.path.clone())
303                    .map_err(|e| {
304                        trace!("{}", e);
305                        WriteError::Pause
306                    })?;
307
308                file.write_all(data).map_err(|e| {
309                    trace!("{}", e);
310                    WriteError::Pause
311                })?;
312
313                info.update_bytes_transferred(data.len());
314
315                send_transfer_info(info);
316                Ok(data.len())
317            }
318            Collector::Streaming(stream, _) => {
319                send_stream_data(stream, data.to_vec());
320                Ok(data.len())
321            }
322        }
323    }
324    /// This will read the chunks of data from a file that will be uploaded
325    /// to the server. This will be use if the Collector is Collector::File(FileInfo).
326    fn read(&mut self, data: &mut [u8]) -> Result<usize, ReadError> {
327        match self {
328            Collector::File(info) => {
329                let mut file = File::open(info.path.clone()).map_err(|e| {
330                    trace!("{}", e);
331                    ReadError::Abort
332                })?;
333
334                file.seek(SeekFrom::Start(info.bytes_transferred() as u64))
335                    .map_err(|e| {
336                        trace!("{}", e);
337                        ReadError::Abort
338                    })?;
339
340                let read_size = file.read(data).map_err(|e| {
341                    trace!("{}", e);
342                    ReadError::Abort
343                })?;
344
345                info.update_bytes_transferred(read_size);
346
347                send_transfer_info(info);
348                Ok(read_size)
349            }
350            Collector::Ram(_) => Ok(0),
351            Collector::RamAndHeaders(_, _) => Ok(0),
352            Collector::FileAndHeaders(info, _) => {
353                let mut file = File::open(info.path.clone()).map_err(|e| {
354                    trace!("{}", e);
355                    ReadError::Abort
356                })?;
357
358                file.seek(SeekFrom::Start(info.bytes_transferred() as u64))
359                    .map_err(|e| {
360                        trace!("{}", e);
361                        ReadError::Abort
362                    })?;
363
364                let read_size = file.read(data).map_err(|e| {
365                    trace!("{}", e);
366                    ReadError::Abort
367                })?;
368
369                info.update_bytes_transferred(read_size);
370
371                send_transfer_info(info);
372                Ok(read_size)
373            }
374            Collector::Streaming(_, _) => Ok(0),
375        }
376    }
377
378    fn header(&mut self, data: &[u8]) -> bool {
379        match self {
380            Collector::File(_) => {}
381            Collector::Ram(_) => {}
382            Collector::RamAndHeaders(_, headers) => {
383                headers.extend_from_slice(data);
384            }
385            Collector::FileAndHeaders(_, headers) => {
386                headers.extend_from_slice(data);
387            }
388            Collector::Streaming(_, headers) => {
389                headers.extend_from_slice(data);
390            }
391        }
392        true
393    }
394
395    fn progress(&mut self, dltotal: f64, dlnow: f64, ultotal: f64, ulnow: f64) -> bool {
396        trace!("dltotal: {dltotal} dlnow: {dlnow} ultotal: {ultotal} ulnow: {ulnow}");
397        match self {
398            Collector::File(file_info) | Collector::FileAndHeaders(file_info, _) => {
399                if let Some(abort) = &file_info.abort {
400                    abort.lock().map(|a| !*a).unwrap_or_else(|err| {
401                        log::error!("{:?}", err);
402                        false
403                    })
404                } else {
405                    true
406                }
407            }
408            Collector::Ram(_) | Collector::RamAndHeaders(_, _) => true,
409            Collector::Streaming(stream, _) => {
410                if let Some(abort) = &stream.abort {
411                    abort.lock().map(|a| !*a).unwrap_or_else(|err| {
412                        log::error!("{:?}", err);
413                        false
414                    })
415                } else {
416                    true
417                }
418            }
419        }
420    }
421}
422
423impl ExtendedHandler for Collector {
424    /// If Collector::File(FileInfo) is set, there will be no response body since the response
425    /// will be stored into a file.
426    ///
427    /// If Collector::Ram(`Vec<u8>`) is set, the response body can be obtain here.
428    fn get_response_body(&self) -> Option<Vec<u8>> {
429        match self {
430            Collector::File(_) => None,
431            Collector::Ram(container) => {
432                if container.is_empty() {
433                    None
434                } else {
435                    Some(container.clone())
436                }
437            }
438            Collector::RamAndHeaders(container, _) => {
439                if container.is_empty() {
440                    None
441                } else {
442                    Some(container.clone())
443                }
444            }
445            Collector::FileAndHeaders(_, _) => None,
446            Collector::Streaming(_, _) => None,
447        }
448    }
449
450    /// If Collector::File(`FileInfo`) is set, there will be no response body since the response will be stored into a file.
451    /// If Collector::Ram(`Vec<u8>`) is set, the response body can be obtain here.
452    /// If Collector::RamAndHeaders(`Vec<u8>`, `Vec<u8>`) is set, the response body and the complete headers are generated.
453    /// If Collector::FileAndHeaders(`FileInfo`, `Vec<u8>`) is set, there will be no response body since the response will be stored into a file but a complete headers are generated.
454    fn get_response_body_and_headers(&self) -> (Option<Vec<u8>>, Option<HeaderMap>) {
455        match self {
456            Collector::File(_) => (None, None),
457            Collector::Ram(container) => {
458                if container.is_empty() {
459                    (None, None)
460                } else {
461                    (Some(container.clone()), None)
462                }
463            }
464            Collector::RamAndHeaders(container, headers) => {
465                let header_str = std::str::from_utf8(headers).unwrap();
466                let mut header_map = HeaderMap::new();
467
468                for line in header_str.lines() {
469                    // Split each line into key-value pairs
470                    if let Some((key, value)) = line.split_once(": ").to_owned() {
471                        if let Ok(header_name) = HeaderName::from_bytes(key.as_bytes()) {
472                            if let Ok(header_value) = HeaderValue::from_str(value) {
473                                // Insert the key-value pair into the HeaderMap
474                                header_map.insert(header_name, header_value);
475                            }
476                        }
477                    }
478                }
479                if container.is_empty() {
480                    (None, Some(header_map))
481                } else {
482                    (Some(container.clone()), Some(header_map))
483                }
484            }
485            Collector::FileAndHeaders(_, headers) => {
486                let header_str = std::str::from_utf8(headers).unwrap();
487                let mut header_map = HeaderMap::new();
488
489                for line in header_str.lines() {
490                    // Split each line into key-value pairs
491                    if let Some((key, value)) = line.split_once(": ").to_owned() {
492                        if let Ok(header_name) = HeaderName::from_bytes(key.as_bytes()) {
493                            if let Ok(header_value) = HeaderValue::from_str(value) {
494                                // Insert the key-value pair into the HeaderMap
495                                header_map.insert(header_name, header_value);
496                            }
497                        }
498                    }
499                }
500                (None, Some(header_map))
501            }
502            Collector::Streaming(_, headers) => {
503                let header_str = std::str::from_utf8(headers).unwrap();
504                let mut header_map = HeaderMap::new();
505
506                for line in header_str.lines() {
507                    // Split each line into key-value pairs
508                    if let Some((key, value)) = line.split_once(": ").to_owned() {
509                        if let Ok(header_name) = HeaderName::from_bytes(key.as_bytes()) {
510                            if let Ok(header_value) = HeaderValue::from_str(value) {
511                                // Insert the key-value pair into the HeaderMap
512                                header_map.insert(header_name, header_value);
513                            }
514                        }
515                    }
516                }
517                (None, Some(header_map))
518            }
519        }
520    }
521}