http_client_multipart/
multipart.rs

1use crate::{generate_boundary, part::Part, Encoding, StreamChunk};
2use futures_lite::{AsyncBufRead, AsyncReadExt, Stream, StreamExt};
3use http_types::{Body, Request, Result};
4use std::{
5    borrow::Cow,
6    io::{Read, Seek},
7    path::Path,
8    pin::Pin,
9};
10
11/// A struct representing a multipart form.
12#[derive(Debug)]
13pub struct Multipart<'m> {
14    boundary: String,
15    fields: Vec<Part<'m>>,
16}
17
18impl Default for Multipart<'_> {
19    fn default() -> Self {
20        Self::new()
21    }
22}
23
24impl<'m> Multipart<'m> {
25    /// Creates a new `Multipart` form with a randomly generated boundary.
26    pub fn new() -> Self {
27        Self {
28            boundary: generate_boundary(),
29            fields: Vec::new(),
30        }
31    }
32
33    /// Adds a text field to the form.
34    pub fn add_text(&mut self, name: impl Into<Cow<'m, str>>, value: impl AsRef<str>) {
35        self.fields.push(Part::text(name, value.as_ref()));
36    }
37
38    /// Adds a file field to the form from path.
39    pub async fn add_file(
40        &mut self,
41        name: impl Into<Cow<'m, str>>,
42        path: impl AsRef<Path>,
43        encoding: Option<Encoding>,
44    ) -> Result<()> {
45        let part = Part::file_async(name, path, encoding).await?;
46        self.fields.push(part);
47        Ok(())
48    }
49
50    /// Adds a file field to the form wrapping a async reader.
51    pub fn add_async_read(
52        &mut self,
53        name: impl Into<Cow<'m, str>>,
54        filename: impl Into<Cow<'m, str>>,
55        content_type: &str,
56        encoding: Option<Encoding>,
57        data: impl AsyncBufRead + Unpin + Send + Sync + 'static,
58        data_len: Option<usize>, // optional length for the async reader, if known
59    ) -> Result<()> {
60        self.fields.push(Part::file_raw_async(
61            name,
62            filename,
63            content_type.parse()?,
64            encoding,
65            data,
66            data_len,
67        ));
68        Ok(())
69    }
70
71    /// Adds a file field to the form wrapping a sync reader.
72    pub fn add_sync_read(
73        &mut self,
74        name: impl Into<Cow<'m, str>>,
75        filename: impl Into<Cow<'m, str>>,
76        content_type: &str,
77        encoding: Option<Encoding>,
78        mut data: impl Read + Seek + Send + 'static,
79    ) -> Result<()> {
80        let mut buffer = Vec::new();
81        data.read_to_end(&mut buffer)?;
82        let body = Body::from(buffer);
83        self.fields.push(Part::file_raw(
84            name,
85            filename,
86            content_type.parse()?,
87            encoding,
88            body,
89        ));
90        Ok(())
91    }
92
93    /// Sets the request body to the multipart form data.
94    pub fn set_request(self, req: &mut Request) {
95        let content_type = format!("multipart/form-data; boundary={}", &self.boundary);
96        req.insert_header("Content-Type", content_type);
97        let body = self.into_body(None);
98        req.set_body(body);
99    }
100
101    /// Converts the multipart form to a `Body`.
102    pub async fn into_bytes(self) -> Result<Vec<u8>> {
103        let mut data: Vec<u8> = Vec::new();
104
105        for field in self.fields {
106            // Add boundary for each field
107            data.extend(format!("--{}\r\n", self.boundary).into_bytes());
108            field.extend(&mut data).await?;
109        }
110        data.extend_from_slice(b"\r\n");
111
112        // Add closing boundary
113        data.extend(format!("--{}--\r\n", self.boundary).into_bytes());
114
115        Ok(data)
116    }
117
118    pub fn into_stream(self, buf_size: Option<usize>) -> impl Stream<Item = StreamChunk> {
119        if self.fields.is_empty() {
120            let empty_stream: Pin<Box<dyn Stream<Item = StreamChunk>>> =
121                Box::pin(futures_lite::stream::empty());
122            return empty_stream;
123        }
124
125        let head_bytes = format!("--{}\r\n", self.boundary).into_bytes();
126        let head_stream = futures_lite::stream::once(Ok(head_bytes.clone()));
127        let seperator = format!("\r\n--{}\r\n", self.boundary).into_bytes();
128        let mut field_iter = self.fields.into_iter();
129        let start = field_iter.next().unwrap().into_stream(buf_size);
130        let start = Box::pin(head_stream.chain(start)) as Pin<Box<dyn Stream<Item = StreamChunk>>>;
131        let stream = field_iter.fold(start, |acc, field| {
132            let seperator = futures_lite::stream::once(Ok(seperator.clone()));
133            let stream = field.into_stream(buf_size);
134            Box::pin(acc.chain(seperator).chain(stream)) as Pin<Box<dyn Stream<Item = StreamChunk>>>
135        });
136        let tail = format!("\r\n--{}--\r\n", self.boundary).into_bytes();
137        let end = futures_lite::stream::once(Ok(tail));
138        Box::pin(stream.chain(end)) as Pin<Box<dyn Stream<Item = StreamChunk>>>
139    }
140
141    pub fn into_reader(self, buf_size: Option<usize>) -> impl AsyncBufRead + Send + Sync {
142        if self.fields.is_empty() {
143            return Box::pin(futures_lite::io::empty()) as Pin<Box<dyn AsyncBufRead + Send + Sync>>;
144        }
145
146        let head_bytes = format!("--{}\r\n", self.boundary).into_bytes();
147        let header_reader = futures_lite::io::Cursor::new(head_bytes.clone());
148        let seperator = format!("\r\n--{}\r\n", self.boundary).into_bytes();
149
150        let mut field_iter = self.fields.into_iter();
151        let start = field_iter.next().unwrap().into_reader(buf_size);
152        let start =
153            Box::pin(header_reader.chain(start)) as Pin<Box<dyn AsyncBufRead + Send + Sync>>;
154        let reader = field_iter.fold(start, |acc, field| {
155            let seperator = futures_lite::io::Cursor::new(seperator.clone());
156            let reader = field.into_reader(buf_size);
157            Box::pin(acc.chain(seperator).chain(reader)) as Pin<Box<dyn AsyncBufRead + Send + Sync>>
158        });
159        let tail = format!("\r\n--{}--\r\n", self.boundary).into_bytes();
160        let end = futures_lite::io::Cursor::new(tail);
161        Box::pin(reader.chain(end)) as Pin<Box<dyn AsyncBufRead + Send + Sync>>
162    }
163
164    fn size_hint(&self) -> Option<usize> {
165        // The first seperator is 30 + 2 + 2 = 34 bytes
166        // The last seperator is 30 + 2 + 2 + 2 + 2 = 38 bytes
167        // The seperator between fields is 30 + 2 + 2 + 2 = 36 bytes
168        // The total size is 34 + 36 * (n - 1) + 38 = 36 * n + 2
169
170        let mut size = 34;
171        for field in &self.fields {
172            size += 36;
173            size += field.size_hint()?;
174        }
175        size += 38;
176        Some(size)
177    }
178
179    fn into_body(self, buf_size: Option<usize>) -> Body {
180        let hint = self.size_hint();
181        Body::from_reader(self.into_reader(buf_size), hint)
182    }
183}
184
185#[cfg(test)]
186mod tests {
187    use super::*;
188
189    // Helper to create a Multipart with fixed boundary and sample text fields.
190    fn create_multipart<'a>() -> Multipart<'a> {
191        let mut m = Multipart::new();
192        // Override the randomly generated boundary for consistency in tests.
193        m.boundary = "test-boundary".into();
194        m.add_text("field1", "value1");
195        m.add_text("field2", "value2");
196        m
197    }
198
199    #[async_std::test]
200    async fn test_stream_and_reader_equivalence() -> Result<()> {
201        // Create two identical Multipart instances.
202        let m_stream = create_multipart();
203        let m_reader = create_multipart();
204
205        // Collect output from the stream implementation.
206        let mut stream = m_stream.into_stream(Some(8));
207        let mut stream_output = Vec::new();
208        while let Some(chunk) = stream.next().await {
209            stream_output.extend(chunk?);
210        }
211
212        // Collect output from the reader implementation.
213        let mut reader = m_reader.into_reader(Some(8));
214        let mut reader_output = Vec::new();
215        reader.read_to_end(&mut reader_output).await?;
216
217        // Compare both outputs.
218        assert_eq!(stream_output, reader_output);
219
220        Ok(())
221    }
222}