multipart_client_stream/
lib.rs

1mod internal;
2
3use std::{collections::VecDeque, io::Cursor, pin::Pin};
4
5use bytes::Bytes;
6use fastrand::Rng;
7use futures_core::Stream;
8use internal::{SendRead, UnsendRead};
9use mime::Mime;
10use tokio::io::AsyncRead;
11use tokio_util::io::ReaderStream;
12
13pub struct Body<R> {
14    stream: ReaderStream<internal::Body<R>>,
15    boundary: String,
16}
17
18pub struct BodyBuilder<R> {
19    boundary: String,
20    parts: Vec<Part<R>>,
21}
22
23pub struct Part<R> {
24    name: String,
25    content_type: Option<Mime>,
26    filename: Option<String>,
27    reader: R,
28}
29
30#[derive(Debug)]
31pub struct Empty;
32
33impl<'a> Body<SendRead<'a>> {
34    pub fn builder() -> BodyBuilder<SendRead<'a>> {
35        let mut rng = Rng::new();
36        let boundary = (0..6).map(|_| rng.alphanumeric()).collect::<String>();
37
38        BodyBuilder {
39            boundary,
40            parts: Vec::new(),
41        }
42    }
43
44    pub fn content_type(&self) -> mime::Mime {
45        format!("multipart/form-data; boundary={}", self.boundary)
46            .parse()
47            .expect("Valid mime for content_type")
48    }
49}
50
51impl<'a> BodyBuilder<SendRead<'a>> {
52    pub fn boundary<S: Into<String>>(mut self, boundary: S) -> Self {
53        self.boundary = boundary.into();
54        self
55    }
56
57    pub fn append(mut self, part: Part<SendRead<'a>>) -> Self {
58        self.parts.push(part);
59        self
60    }
61
62    pub fn append_unsend(self, part: Part<UnsendRead<'a>>) -> BodyBuilder<UnsendRead<'a>> {
63        let mut parts: Vec<_> = self
64            .parts
65            .into_iter()
66            .map(Part::<UnsendRead<'a>>::from)
67            .collect();
68
69        parts.push(part);
70
71        BodyBuilder {
72            boundary: self.boundary,
73            parts,
74        }
75    }
76
77    pub fn build(self) -> Body<SendRead<'a>> {
78        let parts = self
79            .parts
80            .into_iter()
81            .map(Part::<SendRead<'a>>::build)
82            .collect();
83
84        Body {
85            stream: ReaderStream::new(internal::Body::new(self.boundary.clone(), parts)),
86            boundary: self.boundary,
87        }
88    }
89}
90
91impl<'a> BodyBuilder<UnsendRead<'a>> {
92    pub fn boundary<S: Into<String>>(mut self, boundary: S) -> Self {
93        self.boundary = boundary.into();
94        self
95    }
96
97    pub fn append(mut self, part: Part<SendRead<'a>>) -> Self {
98        self.parts.push(From::from(part));
99        self
100    }
101
102    pub fn append_unsend(mut self, part: Part<UnsendRead<'a>>) -> BodyBuilder<UnsendRead<'a>> {
103        self.parts.push(part);
104        self
105    }
106
107    pub fn build(self) -> Body<UnsendRead<'a>> {
108        let parts: VecDeque<internal::Part<UnsendRead<'a>>> = self
109            .parts
110            .into_iter()
111            .map(Part::<UnsendRead<'a>>::build)
112            .collect();
113
114        Body {
115            stream: ReaderStream::new(internal::Body::new(self.boundary.clone(), parts)),
116            boundary: self.boundary,
117        }
118    }
119}
120
121fn encode(value: String) -> String {
122    value.replace('"', "\\\"")
123}
124
125impl<'a> Part<SendRead<'a>> {
126    pub fn new<S: Into<String>, R: AsyncRead + Send + 'a>(name: S, reader: R) -> Self {
127        Part {
128            name: name.into(),
129            content_type: None,
130            filename: None,
131            reader: SendRead(Box::pin(reader)),
132        }
133    }
134
135    pub fn new_unsend<S: Into<String>, R: AsyncRead + 'a>(
136        name: S,
137        reader: R,
138    ) -> Part<UnsendRead<'a>> {
139        Part {
140            name: name.into(),
141            content_type: None,
142            filename: None,
143            reader: UnsendRead(Box::pin(reader)),
144        }
145    }
146
147    pub fn new_str<S: Into<String>>(name: S, text: &'a str) -> Self {
148        Self::new(name, text.as_bytes()).content_type(mime::TEXT_PLAIN)
149    }
150
151    pub fn new_string<S1: Into<String>, S2: Into<String>>(name: S1, text: S2) -> Self {
152        Self::new(name, Cursor::new(text.into())).content_type(mime::TEXT_PLAIN)
153    }
154
155    pub fn content_type(mut self, content_type: mime::Mime) -> Self {
156        self.content_type = Some(content_type);
157        self
158    }
159
160    pub fn filename<S: Into<String>>(mut self, filename: S) -> Self {
161        self.filename = Some(filename.into());
162        self
163    }
164
165    fn build(self) -> internal::Part<SendRead<'a>> {
166        let content_type = self.content_type.unwrap_or(mime::APPLICATION_OCTET_STREAM);
167
168        let name = encode(self.name);
169        let filename = self.filename.map(encode);
170
171        let content_disposition = if let Some(filename) = filename {
172            format!("form-data; name=\"{name}\"; filename=\"{filename}\"")
173        } else {
174            format!("form-data; name=\"{name}\"")
175        };
176
177        internal::Part::<SendRead<'a>>::new(
178            content_type.to_string(),
179            content_disposition,
180            self.reader,
181        )
182    }
183}
184impl<'a> Part<UnsendRead<'a>> {
185    pub fn content_type(mut self, content_type: mime::Mime) -> Self {
186        self.content_type = Some(content_type);
187        self
188    }
189
190    pub fn filename<S: Into<String>>(mut self, filename: S) -> Self {
191        self.filename = Some(filename.into());
192        self
193    }
194
195    fn build(self) -> internal::Part<UnsendRead<'a>> {
196        let content_type = self.content_type.unwrap_or(mime::APPLICATION_OCTET_STREAM);
197
198        let name = encode(self.name);
199        let filename = self.filename.map(encode);
200
201        let content_disposition = if let Some(filename) = filename {
202            format!("form-data; name=\"{name}\"; filename=\"{filename}\"")
203        } else {
204            format!("form-data; name=\"{name}\"")
205        };
206
207        internal::Part::<UnsendRead<'a>>::new(
208            content_type.to_string(),
209            content_disposition,
210            self.reader,
211        )
212    }
213}
214
215impl<R> Stream for Body<R>
216where
217    R: AsyncRead + Unpin,
218{
219    type Item = std::io::Result<Bytes>;
220
221    fn poll_next(
222        self: Pin<&mut Self>,
223        cx: &mut std::task::Context<'_>,
224    ) -> std::task::Poll<Option<Self::Item>> {
225        Pin::new(&mut self.get_mut().stream).poll_next(cx)
226    }
227}
228
229impl<'a> From<Part<SendRead<'a>>> for Part<UnsendRead<'a>> {
230    fn from(value: Part<SendRead<'a>>) -> Self {
231        Self {
232            name: value.name,
233            content_type: value.content_type,
234            filename: value.filename,
235            reader: UnsendRead::from(value.reader),
236        }
237    }
238}
239
240#[cfg(test)]
241mod tests {
242    use std::{future::poll_fn, io::Cursor, pin::Pin};
243
244    struct Streamer<S>(S);
245
246    impl<S> Streamer<S> {
247        async fn next(&mut self) -> Option<S::Item>
248        where
249            S: futures_core::Stream + Unpin,
250        {
251            poll_fn(|cx| Pin::new(&mut self.0).poll_next(cx)).await
252        }
253    }
254
255    fn assert_stream<S: futures_core::Stream>() {}
256
257    #[test]
258    fn impl_asserts() {
259        assert_stream::<super::Body<super::SendRead<'static>>>();
260        assert_stream::<super::Body<super::UnsendRead<'static>>>();
261    }
262
263    #[tokio::test]
264    async fn build_text() {
265        let body = super::Body::builder()
266            .boundary(String::from("hello"))
267            .append(super::Part::new_str(String::from("first_name"), "John"))
268            .append(super::Part::new_str(String::from("last_name"), "Doe"))
269            .build();
270
271        let mut out = Vec::new();
272
273        let mut streamer = Streamer(body);
274
275        while let Some(res) = streamer.next().await {
276            out.extend(res.expect("read success"));
277        }
278
279        let out = String::from_utf8(out).expect("Valid string");
280
281        assert_eq!(out, "--hello\r\ncontent-type: text/plain\r\ncontent-disposition: form-data; name=\"first_name\"\r\n\r\nJohn\r\n--hello\r\ncontent-type: text/plain\r\ncontent-disposition: form-data; name=\"last_name\"\r\n\r\nDoe\r\n--hello--\r\n")
282    }
283
284    #[tokio::test]
285    async fn test_form_body_stream() {
286        let path = "./files/Shenzi.webp";
287        let file = tokio::fs::File::open(path).await.expect("Opened file");
288
289        let body = super::Body::builder()
290            .boundary(String::from("hello"))
291            .append(super::Part::new_str("name1", "value1"))
292            .append(super::Part::new_str("name2", "value2"))
293            .append(
294                super::Part::new("images[]", file)
295                    .filename("Shenzi.webp")
296                    .content_type("image/webp".parse().expect("Parsed mime")),
297            )
298            .append(super::Part::new("input", Cursor::new("Hello World!")))
299            .build();
300
301        let mut out = Vec::new();
302
303        let mut streamer = Streamer(body);
304
305        while let Some(res) = streamer.next().await {
306            out.extend(res.expect("read success"));
307        }
308
309        let file_contents = tokio::fs::read(path).await.expect("Read file");
310
311        let expected = [
312            &b"--hello\r\n"[..],
313            &b"content-type: text/plain\r\n"[..],
314            &b"content-disposition: form-data; name=\"name1\"\r\n"[..],
315            &b"\r\n"[..],
316            &b"value1\r\n"[..],
317            &b"--hello\r\n"[..],
318            &b"content-type: text/plain\r\n"[..],
319            &b"content-disposition: form-data; name=\"name2\"\r\n"[..],
320            &b"\r\n"[..],
321            &b"value2\r\n"[..],
322            &b"--hello\r\n"[..],
323            &b"content-type: image/webp\r\n"[..],
324            &b"content-disposition: form-data; name=\"images[]\"; filename=\"Shenzi.webp\"\r\n"[..],
325            &b"\r\n"[..],
326            &file_contents[..],
327            &b"\r\n"[..],
328            &b"--hello\r\n"[..],
329            &b"content-type: application/octet-stream\r\n"[..],
330            &b"content-disposition: form-data; name=\"input\"\r\n"[..],
331            &b"\r\n"[..],
332            &b"Hello World!\r\n"[..],
333            &b"--hello--\r\n"[..],
334        ]
335        .into_iter()
336        .map(|s| s.iter().copied())
337        .flatten()
338        .collect::<Vec<u8>>();
339
340        if out != expected {
341            tokio::fs::write("./simple.actual.multipart", out)
342                .await
343                .expect("Wrote file");
344            tokio::fs::write("./simple.expected.multipart", expected)
345                .await
346                .expect("Wrote file");
347            panic!("No match")
348        }
349    }
350
351    struct MyBoundary;
352    impl common_multipart_rfc7578::client::multipart::BoundaryGenerator for MyBoundary {
353        fn generate_boundary() -> String {
354            String::from("MYBOUNDARY")
355        }
356    }
357
358    #[tokio::test]
359    async fn compare_feristseng() {
360        let mut form = common_multipart_rfc7578::client::multipart::Form::new::<MyBoundary>();
361        let path1 = "./files/Shenzi.webp";
362        let path2 = "./files/Shenzi.png";
363        form.add_text("meowdy", "y'all");
364        form.add_reader_file_with_mime(
365            "images[]",
366            std::fs::File::open(path1).expect("Opened file"),
367            "Shenzi.webp",
368            "image/webp".parse().expect("Valid mime"),
369        );
370        form.add_reader("images[]", std::fs::File::open(path2).expect("Opened file"));
371        form.add_text("howdy", "y'all");
372
373        let body = super::Body::builder()
374            .boundary("MYBOUNDARY")
375            .append(super::Part::new_str("meowdy", "y'all"))
376            .append(
377                super::Part::new(
378                    "images[]",
379                    tokio::fs::File::open(path1).await.expect("Opened file"),
380                )
381                .filename("Shenzi.webp")
382                .content_type("image/webp".parse().expect("Valid mime")),
383            )
384            .append(super::Part::new(
385                "images[]",
386                tokio::fs::File::open(path2).await.expect("Opened file"),
387            ))
388            .append(super::Part::new_str("howdy", "y'all"))
389            .build();
390
391        let mut ferrisbody: Streamer<common_multipart_rfc7578::client::multipart::Body> =
392            Streamer(form.into());
393        let mut mybody = Streamer(body);
394
395        let mut ferriststeng = Vec::new();
396        let mut mine = Vec::new();
397
398        while let Some(res) = ferrisbody.next().await {
399            ferriststeng.extend(res.expect("read success"));
400        }
401        while let Some(res) = mybody.next().await {
402            mine.extend(res.expect("read success"));
403        }
404
405        if mine != ferriststeng {
406            tokio::fs::write("./compare.actual.multipart", mine)
407                .await
408                .expect("Wrote file");
409            tokio::fs::write("./compare.expected.multipart", ferriststeng)
410                .await
411                .expect("Wrote file");
412            panic!("No match")
413        }
414    }
415
416    #[test]
417    fn encode() {
418        let cases = [("hello", "hello"), ("Hello \"John\"", "Hello \\\"John\\\"")];
419
420        for (input, expected) in cases {
421            let output = super::encode(String::from(input));
422
423            assert_eq!(output, expected);
424        }
425    }
426}