qiniu_multipart/client/
lazy.rs

1//! Multipart requests which write out their data in one fell swoop.
2use mime::Mime;
3
4use std::borrow::Cow;
5use std::error::Error;
6use std::fs::File;
7use std::path::{Path, PathBuf};
8
9use std::io::prelude::*;
10use std::io::Cursor;
11use std::{fmt, io};
12
13use super::{HttpRequest, HttpStream};
14
15macro_rules! try_lazy (
16    ($field:expr, $try:expr) => (
17        match $try {
18            Ok(ok) => ok,
19            Err(e) => return Err(LazyError::with_field($field.into(), e)),
20        }
21    );
22    ($try:expr) => (
23        match $try {
24            Ok(ok) => ok,
25            Err(e) => return Err(LazyError::without_field(e)),
26        }
27    )
28);
29
30/// A `LazyError` wrapping `std::io::Error`.
31pub type LazyIoError<'a> = LazyError<'a, io::Error>;
32
33/// `Result` type for `LazyIoError`.
34pub type LazyIoResult<'a, T> = Result<T, LazyIoError<'a>>;
35
36/// An error for lazily written multipart requests, including the original error as well
37/// as the field which caused the error, if applicable.
38pub struct LazyError<'a, E> {
39    /// The field that caused the error.
40    /// If `None`, there was a problem opening the stream to write or finalizing the stream.
41    pub field_name: Option<Cow<'a, str>>,
42    /// The inner error.
43    pub error: E,
44    /// Private field for back-compat.
45    _priv: (),
46}
47
48impl<'a, E> LazyError<'a, E> {
49    fn without_field<E_: Into<E>>(error: E_) -> Self {
50        LazyError {
51            field_name: None,
52            error: error.into(),
53            _priv: (),
54        }
55    }
56
57    fn with_field<E_: Into<E>>(field_name: Cow<'a, str>, error: E_) -> Self {
58        LazyError {
59            field_name: Some(field_name),
60            error: error.into(),
61            _priv: (),
62        }
63    }
64
65    fn transform_err<E_: From<E>>(self) -> LazyError<'a, E_> {
66        LazyError {
67            field_name: self.field_name,
68            error: self.error.into(),
69            _priv: (),
70        }
71    }
72}
73
74/// Take `self.error`, discarding `self.field_name`.
75impl<'a> Into<io::Error> for LazyError<'a, io::Error> {
76    fn into(self) -> io::Error {
77        self.error
78    }
79}
80
81impl<'a, E: Error> Error for LazyError<'a, E> {
82    fn description(&self) -> &str {
83        self.error.description()
84    }
85
86    fn cause(&self) -> Option<&dyn Error> {
87        Some(&self.error)
88    }
89}
90
91impl<'a, E: fmt::Debug> fmt::Debug for LazyError<'a, E> {
92    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
93        if let Some(ref field_name) = self.field_name {
94            fmt.write_fmt(format_args!(
95                "LazyError (on field {:?}): {:?}",
96                field_name, self.error
97            ))
98        } else {
99            fmt.write_fmt(format_args!("LazyError (misc): {:?}", self.error))
100        }
101    }
102}
103
104impl<'a, E: fmt::Display> fmt::Display for LazyError<'a, E> {
105    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
106        if let Some(ref field_name) = self.field_name {
107            fmt.write_fmt(format_args!(
108                "Error writing field {:?}: {}",
109                field_name, self.error
110            ))
111        } else {
112            fmt.write_fmt(format_args!(
113                "Error opening or flushing stream: {}",
114                self.error
115            ))
116        }
117    }
118}
119
120/// A multipart request which writes all fields at once upon being provided an output stream.
121///
122/// Sacrifices static dispatch for support for dynamic construction. Reusable.
123///
124/// #### Lifetimes
125/// * `'n`: Lifetime for field **n**ames; will only escape this struct in `LazyIoError<'n>`.
126/// * `'d`: Lifetime for **d**ata: will only escape this struct in `PreparedFields<'d>`.
127#[derive(Debug, Default)]
128pub struct Multipart<'n, 'd> {
129    fields: Vec<Field<'n, 'd>>,
130}
131
132impl<'n, 'd> Multipart<'n, 'd> {
133    /// Initialize a new lazy dynamic request.
134    pub fn new() -> Self {
135        Default::default()
136    }
137
138    /// Add a text field to this request.
139    pub fn add_text<N, T>(&mut self, name: N, text: T) -> &mut Self
140    where
141        N: Into<Cow<'n, str>>,
142        T: Into<Cow<'d, str>>,
143    {
144        self.fields.push(Field {
145            name: name.into(),
146            data: Data::Text(text.into()),
147        });
148
149        self
150    }
151
152    /// Add a file field to this request.
153    ///
154    /// ### Note
155    /// Does not check if `path` exists.
156    pub fn add_file<N, P>(&mut self, name: N, path: P) -> &mut Self
157    where
158        N: Into<Cow<'n, str>>,
159        P: IntoCowPath<'d>,
160    {
161        self.fields.push(Field {
162            name: name.into(),
163            data: Data::File(path.into_cow_path()),
164        });
165
166        self
167    }
168
169    /// Add a generic stream field to this request,
170    pub fn add_stream<N, R, F>(
171        &mut self,
172        name: N,
173        stream: R,
174        filename: Option<F>,
175        mime: Option<Mime>,
176        content_range: Option<F>,
177    ) -> &mut Self
178    where
179        N: Into<Cow<'n, str>>,
180        R: Read + 'd,
181        F: Into<Cow<'n, str>>,
182    {
183        self.fields.push(Field {
184            name: name.into(),
185            data: Data::Stream(Stream {
186                content_type: mime.unwrap_or(mime::APPLICATION_OCTET_STREAM),
187                content_range: content_range.map(|r| r.into()),
188                filename: filename.map(|f| f.into()),
189                stream: Box::new(stream),
190            }),
191        });
192
193        self
194    }
195
196    /// Convert `req` to `HttpStream`, write out the fields in this request, and finish the
197    /// request, returning the response if successful, or the first error encountered.
198    ///
199    /// If any files were added by path they will now be opened for reading.
200    pub fn send<R: HttpRequest>(
201        &mut self,
202        mut req: R,
203    ) -> Result<<R::Stream as HttpStream>::Response, LazyError<'n, <R::Stream as HttpStream>::Error>>
204    {
205        let mut prepared = self.prepare().map_err(LazyError::transform_err)?;
206
207        req.apply_headers(prepared.boundary(), prepared.content_len());
208
209        let mut stream = try_lazy!(req.open_stream());
210
211        try_lazy!(io::copy(&mut prepared, &mut stream));
212
213        stream.finish().map_err(LazyError::without_field)
214    }
215
216    /// Export the multipart data contained in this lazy request as an adaptor which implements `Read`.
217    ///
218    /// During this step, if any files were added by path then they will be opened for reading
219    /// and their length measured.
220    pub fn prepare(&mut self) -> LazyIoResult<'n, PreparedFields<'d>> {
221        PreparedFields::from_fields(&mut self.fields)
222    }
223}
224
225#[derive(Debug)]
226struct Field<'n, 'd> {
227    name: Cow<'n, str>,
228    data: Data<'n, 'd>,
229}
230
231enum Data<'n, 'd> {
232    Text(Cow<'d, str>),
233    File(Cow<'d, Path>),
234    Stream(Stream<'n, 'd>),
235}
236
237impl<'n, 'd> fmt::Debug for Data<'n, 'd> {
238    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
239        match *self {
240            Data::Text(ref text) => write!(f, "Data::Text({:?})", text),
241            Data::File(ref path) => write!(f, "Data::File({:?})", path),
242            Data::Stream(_) => f.write_str("Data::Stream(Box<Read>)"),
243        }
244    }
245}
246
247struct Stream<'n, 'd> {
248    filename: Option<Cow<'n, str>>,
249    content_type: Mime,
250    content_range: Option<Cow<'n, str>>,
251    stream: Box<dyn Read + 'd>,
252}
253
254/// The result of [`Multipart::prepare()`](struct.Multipart.html#method.prepare).
255///
256/// Implements `Read`, contains the entire request body.
257///
258/// Individual files/streams are dropped as they are read to completion.
259///
260/// ### Note
261/// The fields in the request may have been reordered to simplify the preparation step.
262/// No compliant server implementation will be relying on the specific ordering of fields anyways.
263pub struct PreparedFields<'d> {
264    text_data: Cursor<Vec<u8>>,
265    streams: Vec<PreparedField<'d>>,
266    end_boundary: Cursor<String>,
267    content_len: Option<u64>,
268}
269
270impl<'d> PreparedFields<'d> {
271    fn from_fields<'n>(fields: &mut Vec<Field<'n, 'd>>) -> Result<Self, LazyIoError<'n>> {
272        debug!("Field count: {}", fields.len());
273
274        // One of the two RFCs specifies that any bytes before the first boundary are to be
275        // ignored anyway
276        let mut boundary = format!("\r\n--{}", super::gen_boundary());
277
278        let mut text_data = Vec::new();
279        let mut streams = Vec::new();
280        let mut content_len = 0u64;
281        let mut use_len = true;
282
283        for field in fields.drain(..) {
284            match field.data {
285                Data::Text(text) => write!(
286                    text_data,
287                    "{}\r\nContent-Disposition: form-data; \
288                     name=\"{}\"\r\n\r\n{}",
289                    boundary, field.name, text
290                )
291                .unwrap(),
292                Data::File(file) => {
293                    let (stream, len) = PreparedField::from_path(field.name, &file, &boundary)?;
294                    content_len += len;
295                    streams.push(stream);
296                }
297                Data::Stream(stream) => {
298                    use_len = false;
299
300                    streams.push(PreparedField::from_stream(
301                        &field.name,
302                        &boundary,
303                        &stream.content_type,
304                        stream.content_range.as_ref().map(|r| r.as_ref()),
305                        stream.filename.as_ref().map(|f| &**f),
306                        stream.stream,
307                    ));
308                }
309            }
310        }
311
312        // So we don't write a spurious end boundary
313        if text_data.is_empty() && streams.is_empty() {
314            boundary = String::new();
315        } else {
316            boundary.push_str("--");
317        }
318
319        content_len += boundary.len() as u64;
320
321        Ok(PreparedFields {
322            text_data: Cursor::new(text_data),
323            streams,
324            end_boundary: Cursor::new(boundary),
325            content_len: if use_len { Some(content_len) } else { None },
326        })
327    }
328
329    /// Get the content-length value for this set of fields, if applicable (all fields are sized,
330    /// i.e. not generic streams).
331    pub fn content_len(&self) -> Option<u64> {
332        self.content_len
333    }
334
335    /// Get the boundary that was used to serialize the request.
336    pub fn boundary(&self) -> &str {
337        let boundary = self.end_boundary.get_ref();
338
339        // Get just the bare boundary string
340        &boundary[4..boundary.len() - 2]
341    }
342}
343
344impl<'d> Read for PreparedFields<'d> {
345    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
346        if buf.is_empty() {
347            debug!("PreparedFields::read() was passed a zero-sized buffer.");
348            return Ok(0);
349        }
350
351        let mut total_read = 0;
352
353        while total_read < buf.len() && !cursor_at_end(&self.end_boundary) {
354            let buf = &mut buf[total_read..];
355
356            total_read += if !cursor_at_end(&self.text_data) {
357                self.text_data.read(buf)?
358            } else if let Some(mut field) = self.streams.pop() {
359                match field.read(buf) {
360                    Ok(0) => continue,
361                    res => {
362                        self.streams.push(field);
363                        res
364                    }
365                }?
366            } else {
367                self.end_boundary.read(buf)?
368            };
369        }
370
371        Ok(total_read)
372    }
373}
374
375struct PreparedField<'d> {
376    header: Cursor<Vec<u8>>,
377    stream: Box<dyn Read + 'd>,
378}
379
380impl<'d> PreparedField<'d> {
381    fn from_path<'n>(
382        name: Cow<'n, str>,
383        path: &Path,
384        boundary: &str,
385    ) -> Result<(Self, u64), LazyIoError<'n>> {
386        let (content_type, filename) = super::mime_filename(&path);
387
388        let file = try_lazy!(name, File::open(path));
389        let content_len = try_lazy!(name, file.metadata()).len();
390
391        let stream = Self::from_stream(
392            &name,
393            boundary,
394            &content_type,
395            None,
396            filename,
397            Box::new(file),
398        );
399
400        let content_len = content_len + (stream.header.get_ref().len() as u64);
401
402        Ok((stream, content_len))
403    }
404
405    fn from_stream(
406        name: &str,
407        boundary: &str,
408        content_type: &Mime,
409        content_range: Option<&str>,
410        filename: Option<&str>,
411        stream: Box<dyn Read + 'd>,
412    ) -> Self {
413        let mut header = Vec::new();
414
415        write!(
416            header,
417            "{}\r\nContent-Disposition: form-data; name=\"{}\"",
418            boundary, name
419        )
420        .unwrap();
421
422        if let Some(filename) = filename {
423            write!(header, "; filename=\"{}\"", filename).unwrap();
424        }
425
426        write!(header, "\r\n").unwrap();
427        write!(header, "Content-Type: {}\r\n", content_type).unwrap();
428        if let Some(content_range) = content_range {
429            write!(header, "Content-Range: {}\r\n", content_range).unwrap();
430        }
431        write!(header, "\r\n").unwrap();
432
433        PreparedField {
434            header: Cursor::new(header),
435            stream,
436        }
437    }
438}
439
440impl<'d> Read for PreparedField<'d> {
441    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
442        debug!("PreparedField::read()");
443
444        if !cursor_at_end(&self.header) {
445            self.header.read(buf)
446        } else {
447            self.stream.read(buf)
448        }
449    }
450}
451
452impl<'d> fmt::Debug for PreparedField<'d> {
453    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
454        f.debug_struct("PreparedField")
455            .field("header", &self.header)
456            .field("stream", &"Box<Read>")
457            .finish()
458    }
459}
460
461/// Conversion trait necessary for `Multipart::add_file()` to accept borrowed or owned strings
462/// and borrowed or owned paths
463pub trait IntoCowPath<'a> {
464    /// Self-explanatory, hopefully
465    fn into_cow_path(self) -> Cow<'a, Path>;
466}
467
468impl<'a> IntoCowPath<'a> for Cow<'a, Path> {
469    fn into_cow_path(self) -> Cow<'a, Path> {
470        self
471    }
472}
473
474impl IntoCowPath<'static> for PathBuf {
475    fn into_cow_path(self) -> Cow<'static, Path> {
476        self.into()
477    }
478}
479
480impl<'a> IntoCowPath<'a> for &'a Path {
481    fn into_cow_path(self) -> Cow<'a, Path> {
482        self.into()
483    }
484}
485
486impl IntoCowPath<'static> for String {
487    fn into_cow_path(self) -> Cow<'static, Path> {
488        PathBuf::from(self).into()
489    }
490}
491
492impl<'a> IntoCowPath<'a> for &'a str {
493    fn into_cow_path(self) -> Cow<'a, Path> {
494        Path::new(self).into()
495    }
496}
497
498fn cursor_at_end<T: AsRef<[u8]>>(cursor: &Cursor<T>) -> bool {
499    cursor.position() == (cursor.get_ref().as_ref().len() as u64)
500}
501
502#[cfg(feature = "hyper")]
503mod hyper {
504    use hyper::client::{Body, Client, IntoUrl, RequestBuilder, Response};
505    use hyper::Result as HyperResult;
506
507    impl<'n, 'd> super::Multipart<'n, 'd> {
508        /// #### Feature: `hyper`
509        /// Complete a POST request with the given `hyper::client::Client` and URL.
510        ///
511        /// Supplies the fields in the body, optionally setting the content-length header if
512        /// applicable (all added fields were text or files, i.e. no streams).
513        pub fn client_request<U: IntoUrl>(
514            &mut self,
515            client: &Client,
516            url: U,
517        ) -> HyperResult<Response> {
518            self.client_request_mut(client, url, |r| r)
519        }
520
521        /// #### Feature: `hyper`
522        /// Complete a POST request with the given `hyper::client::Client` and URL;
523        /// allows mutating the `hyper::client::RequestBuilder` via the passed closure.
524        ///
525        /// Note that the body, and the `ContentType` and `ContentLength` headers will be
526        /// overwritten, either by this method or by Hyper.
527        pub fn client_request_mut<U: IntoUrl, F: FnOnce(RequestBuilder) -> RequestBuilder>(
528            &mut self,
529            client: &Client,
530            url: U,
531            mut_fn: F,
532        ) -> HyperResult<Response> {
533            let mut fields = match self.prepare() {
534                Ok(fields) => fields,
535                Err(err) => {
536                    error!("Error preparing request: {}", err);
537                    return Err(err.error.into());
538                }
539            };
540
541            mut_fn(client.post(url))
542                .header(::client::hyper::content_type(fields.boundary()))
543                .body(fields.to_body())
544                .send()
545        }
546    }
547
548    impl<'d> super::PreparedFields<'d> {
549        /// #### Feature: `hyper`
550        /// Convert `self` to `hyper::client::Body`.
551        #[cfg_attr(feature = "clippy", warn(wrong_self_convention))]
552        pub fn to_body<'b>(&'b mut self) -> Body<'b>
553        where
554            'd: 'b,
555        {
556            if let Some(content_len) = self.content_len {
557                Body::SizedBody(self, content_len)
558            } else {
559                Body::ChunkedBody(self)
560            }
561        }
562    }
563}