http_client_multipart/
multipart.rs1use crate::{generate_boundary, part::Part, Encoding, StreamChunk};
2use futures_lite::{AsyncBufRead, AsyncReadExt, Stream, StreamExt};
3use http_types::{Body, Request, Result};
4use std::{
5 borrow::Cow,
6 io::{Read, Seek},
7 path::Path,
8 pin::Pin,
9};
10
11#[derive(Debug)]
13pub struct Multipart<'m> {
14 boundary: String,
15 fields: Vec<Part<'m>>,
16}
17
18impl Default for Multipart<'_> {
19 fn default() -> Self {
20 Self::new()
21 }
22}
23
24impl<'m> Multipart<'m> {
25 pub fn new() -> Self {
27 Self {
28 boundary: generate_boundary(),
29 fields: Vec::new(),
30 }
31 }
32
33 pub fn add_text(&mut self, name: impl Into<Cow<'m, str>>, value: impl AsRef<str>) {
35 self.fields.push(Part::text(name, value.as_ref()));
36 }
37
38 pub async fn add_file(
40 &mut self,
41 name: impl Into<Cow<'m, str>>,
42 path: impl AsRef<Path>,
43 encoding: Option<Encoding>,
44 ) -> Result<()> {
45 let part = Part::file_async(name, path, encoding).await?;
46 self.fields.push(part);
47 Ok(())
48 }
49
50 pub fn add_async_read(
52 &mut self,
53 name: impl Into<Cow<'m, str>>,
54 filename: impl Into<Cow<'m, str>>,
55 content_type: &str,
56 encoding: Option<Encoding>,
57 data: impl AsyncBufRead + Unpin + Send + Sync + 'static,
58 data_len: Option<usize>, ) -> Result<()> {
60 self.fields.push(Part::file_raw_async(
61 name,
62 filename,
63 content_type.parse()?,
64 encoding,
65 data,
66 data_len,
67 ));
68 Ok(())
69 }
70
71 pub fn add_sync_read(
73 &mut self,
74 name: impl Into<Cow<'m, str>>,
75 filename: impl Into<Cow<'m, str>>,
76 content_type: &str,
77 encoding: Option<Encoding>,
78 mut data: impl Read + Seek + Send + 'static,
79 ) -> Result<()> {
80 let mut buffer = Vec::new();
81 data.read_to_end(&mut buffer)?;
82 let body = Body::from(buffer);
83 self.fields.push(Part::file_raw(
84 name,
85 filename,
86 content_type.parse()?,
87 encoding,
88 body,
89 ));
90 Ok(())
91 }
92
93 pub fn set_request(self, req: &mut Request) {
95 let content_type = format!("multipart/form-data; boundary={}", &self.boundary);
96 req.insert_header("Content-Type", content_type);
97 let body = self.into_body(None);
98 req.set_body(body);
99 }
100
101 pub async fn into_bytes(self) -> Result<Vec<u8>> {
103 let mut data: Vec<u8> = Vec::new();
104
105 for field in self.fields {
106 data.extend(format!("--{}\r\n", self.boundary).into_bytes());
108 field.extend(&mut data).await?;
109 }
110 data.extend_from_slice(b"\r\n");
111
112 data.extend(format!("--{}--\r\n", self.boundary).into_bytes());
114
115 Ok(data)
116 }
117
118 pub fn into_stream(self, buf_size: Option<usize>) -> impl Stream<Item = StreamChunk> {
119 if self.fields.is_empty() {
120 let empty_stream: Pin<Box<dyn Stream<Item = StreamChunk>>> =
121 Box::pin(futures_lite::stream::empty());
122 return empty_stream;
123 }
124
125 let head_bytes = format!("--{}\r\n", self.boundary).into_bytes();
126 let head_stream = futures_lite::stream::once(Ok(head_bytes.clone()));
127 let seperator = format!("\r\n--{}\r\n", self.boundary).into_bytes();
128 let mut field_iter = self.fields.into_iter();
129 let start = field_iter.next().unwrap().into_stream(buf_size);
130 let start = Box::pin(head_stream.chain(start)) as Pin<Box<dyn Stream<Item = StreamChunk>>>;
131 let stream = field_iter.fold(start, |acc, field| {
132 let seperator = futures_lite::stream::once(Ok(seperator.clone()));
133 let stream = field.into_stream(buf_size);
134 Box::pin(acc.chain(seperator).chain(stream)) as Pin<Box<dyn Stream<Item = StreamChunk>>>
135 });
136 let tail = format!("\r\n--{}--\r\n", self.boundary).into_bytes();
137 let end = futures_lite::stream::once(Ok(tail));
138 Box::pin(stream.chain(end)) as Pin<Box<dyn Stream<Item = StreamChunk>>>
139 }
140
141 pub fn into_reader(self, buf_size: Option<usize>) -> impl AsyncBufRead + Send + Sync {
142 if self.fields.is_empty() {
143 return Box::pin(futures_lite::io::empty()) as Pin<Box<dyn AsyncBufRead + Send + Sync>>;
144 }
145
146 let head_bytes = format!("--{}\r\n", self.boundary).into_bytes();
147 let header_reader = futures_lite::io::Cursor::new(head_bytes.clone());
148 let seperator = format!("\r\n--{}\r\n", self.boundary).into_bytes();
149
150 let mut field_iter = self.fields.into_iter();
151 let start = field_iter.next().unwrap().into_reader(buf_size);
152 let start =
153 Box::pin(header_reader.chain(start)) as Pin<Box<dyn AsyncBufRead + Send + Sync>>;
154 let reader = field_iter.fold(start, |acc, field| {
155 let seperator = futures_lite::io::Cursor::new(seperator.clone());
156 let reader = field.into_reader(buf_size);
157 Box::pin(acc.chain(seperator).chain(reader)) as Pin<Box<dyn AsyncBufRead + Send + Sync>>
158 });
159 let tail = format!("\r\n--{}--\r\n", self.boundary).into_bytes();
160 let end = futures_lite::io::Cursor::new(tail);
161 Box::pin(reader.chain(end)) as Pin<Box<dyn AsyncBufRead + Send + Sync>>
162 }
163
164 fn size_hint(&self) -> Option<usize> {
165 let mut size = 34;
171 for field in &self.fields {
172 size += 36;
173 size += field.size_hint()?;
174 }
175 size += 38;
176 Some(size)
177 }
178
179 fn into_body(self, buf_size: Option<usize>) -> Body {
180 let hint = self.size_hint();
181 Body::from_reader(self.into_reader(buf_size), hint)
182 }
183}
184
185#[cfg(test)]
186mod tests {
187 use super::*;
188
189 fn create_multipart<'a>() -> Multipart<'a> {
191 let mut m = Multipart::new();
192 m.boundary = "test-boundary".into();
194 m.add_text("field1", "value1");
195 m.add_text("field2", "value2");
196 m
197 }
198
199 #[async_std::test]
200 async fn test_stream_and_reader_equivalence() -> Result<()> {
201 let m_stream = create_multipart();
203 let m_reader = create_multipart();
204
205 let mut stream = m_stream.into_stream(Some(8));
207 let mut stream_output = Vec::new();
208 while let Some(chunk) = stream.next().await {
209 stream_output.extend(chunk?);
210 }
211
212 let mut reader = m_reader.into_reader(Some(8));
214 let mut reader_output = Vec::new();
215 reader.read_to_end(&mut reader_output).await?;
216
217 assert_eq!(stream_output, reader_output);
219
220 Ok(())
221 }
222}