Skip to main content

modo_upload/
stream.rs

1use crate::file::FieldMeta;
2use bytes::Bytes;
3use futures_util::stream;
4use std::pin::Pin;
5use tokio::io::AsyncRead;
6
7/// An uploaded file fully buffered in memory as a sequence of chunks.
8///
9/// During multipart parsing all chunks are collected into memory before the
10/// struct is returned.  The [`chunk()`](Self::chunk) and
11/// [`into_reader()`](Self::into_reader) methods replay from this buffer.
12pub struct BufferedUpload {
13    name: String,
14    file_name: String,
15    content_type: String,
16    chunks: Vec<Bytes>,
17    total_size: usize,
18    pos: usize,
19}
20
21impl BufferedUpload {
22    /// Create from an axum multipart field by draining its chunks.
23    #[doc(hidden)]
24    pub async fn from_field(
25        field: axum::extract::multipart::Field<'_>,
26        max_size: Option<usize>,
27    ) -> Result<Self, modo::Error> {
28        let meta = FieldMeta::from_field(&field);
29
30        // Collect chunks from the borrowed field into an owned Vec<Bytes>
31        let mut chunks = Vec::new();
32        let mut total_size: usize = 0;
33        let mut field = field;
34        while let Some(chunk) = field.chunk().await.map_err(|e| {
35            modo::HttpError::BadRequest.with_message(format!("failed to read multipart chunk: {e}"))
36        })? {
37            total_size += chunk.len();
38            if let Some(max) = max_size
39                && total_size > max
40            {
41                return Err(modo::HttpError::PayloadTooLarge
42                    .with_message("upload exceeds maximum allowed size"));
43            }
44            chunks.push(chunk);
45        }
46
47        Ok(Self {
48            name: meta.name,
49            file_name: meta.file_name,
50            content_type: meta.content_type,
51            chunks,
52            total_size,
53            pos: 0,
54        })
55    }
56
57    /// The multipart field name.
58    pub fn name(&self) -> &str {
59        &self.name
60    }
61
62    /// The original filename provided by the client.
63    pub fn file_name(&self) -> &str {
64        &self.file_name
65    }
66
67    /// The MIME content type.
68    pub fn content_type(&self) -> &str {
69        &self.content_type
70    }
71
72    /// Read the next chunk. Returns `None` when all chunks are consumed.
73    pub async fn chunk(&mut self) -> Option<Result<Bytes, std::io::Error>> {
74        if self.pos < self.chunks.len() {
75            let chunk = self.chunks[self.pos].clone();
76            self.pos += 1;
77            Some(Ok(chunk))
78        } else {
79            None
80        }
81    }
82
83    /// Convert into an `AsyncRead` for use with tokio I/O.
84    pub fn into_reader(self) -> Pin<Box<dyn AsyncRead + Send>> {
85        let chunks = self.chunks;
86        let s = stream::iter(chunks.into_iter().map(Ok::<_, std::io::Error>));
87        Box::pin(tokio_util::io::StreamReader::new(s))
88    }
89
90    /// Total size of all collected chunks in bytes.
91    pub fn size(&self) -> usize {
92        self.total_size
93    }
94
95    /// Collapse all chunks into a single contiguous `Bytes`.
96    /// Single allocation sized to total content length.
97    pub fn to_bytes(&self) -> bytes::Bytes {
98        if self.chunks.len() == 1 {
99            return self.chunks[0].clone(); // Bytes::clone is cheap (Arc)
100        }
101        let mut buf = bytes::BytesMut::with_capacity(self.total_size);
102        for chunk in &self.chunks {
103            buf.extend_from_slice(chunk);
104        }
105        buf.freeze()
106    }
107
108    /// Test helper — construct a `BufferedUpload` without multipart parsing.
109    #[doc(hidden)]
110    pub fn __test_new(name: &str, file_name: &str, content_type: &str, chunks: Vec<Bytes>) -> Self {
111        let total_size = chunks.iter().map(|c| c.len()).sum();
112        Self {
113            name: name.to_owned(),
114            file_name: file_name.to_owned(),
115            content_type: content_type.to_owned(),
116            chunks,
117            total_size,
118            pos: 0,
119        }
120    }
121}