Skip to main content

detritus_protocol/
multipart.rs

1//! Feature-gated multipart helpers for crash envelopes.
2
3use std::fmt::Write as _;
4
5use bytes::Bytes;
6use futures_util::stream;
7use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
8
9use crate::{
10    PROTOCOL_VERSION,
11    crash::{CrashAttachment, CrashEnvelope, CrashMetadata, ProtocolError},
12};
13
14/// Default multipart boundary used by tests and simple SDK callers.
15pub const DEFAULT_BOUNDARY: &str = "detritus-boundary-v1";
16const CRLF: &str = "\r\n";
17
18/// Per-part encoding options for multipart crash upload.
19#[derive(Debug, Clone, Default)]
20pub struct PartEncoding {
21    /// Value of the `Content-Encoding` header to add to this part, e.g. `"zstd"`.
22    ///
23    /// `None` means no `Content-Encoding` header is emitted (plain bytes).
24    pub content_encoding: Option<String>,
25}
26
27impl CrashEnvelope {
28    /// Writes the envelope as an RFC 7578 multipart body with [`DEFAULT_BOUNDARY`].
29    pub async fn write_to<W>(&self, writer: &mut W) -> Result<(), ProtocolError>
30    where
31        W: AsyncWrite + Unpin,
32    {
33        self.write_to_with_boundary(writer, DEFAULT_BOUNDARY).await
34    }
35
36    /// Writes the envelope as an RFC 7578 multipart body.
37    pub async fn write_to_with_boundary<W>(
38        &self,
39        writer: &mut W,
40        boundary: &str,
41    ) -> Result<(), ProtocolError>
42    where
43        W: AsyncWrite + Unpin,
44    {
45        let encodings = EnvelopeEncodings::none();
46        self.write_to_with_boundary_and_encodings(writer, boundary, &encodings)
47            .await
48    }
49
50    /// Writes the envelope as an RFC 7578 multipart body, applying per-part
51    /// `Content-Encoding` headers as specified by `encodings`.
52    pub async fn write_to_with_boundary_and_encodings<W>(
53        &self,
54        writer: &mut W,
55        boundary: &str,
56        encodings: &EnvelopeEncodings,
57    ) -> Result<(), ProtocolError>
58    where
59        W: AsyncWrite + Unpin,
60    {
61        let metadata = serde_json::to_vec(&self.metadata)?;
62        write_part(
63            writer,
64            boundary,
65            "metadata",
66            "application/json",
67            None,
68            &metadata,
69        )
70        .await?;
71        write_part(
72            writer,
73            boundary,
74            "dump",
75            "application/octet-stream",
76            encodings.dump.content_encoding.as_deref(),
77            &self.dump,
78        )
79        .await?;
80        for (idx, attachment) in self.attachments.iter().enumerate() {
81            let name = format!("attach:{}", attachment.key);
82            let encoding = encodings
83                .attachments
84                .get(idx)
85                .and_then(|e| e.content_encoding.as_deref());
86            write_part(
87                writer,
88                boundary,
89                &name,
90                &attachment.content_type,
91                encoding,
92                &attachment.bytes,
93            )
94            .await?;
95        }
96        writer
97            .write_all(format!("--{boundary}--{CRLF}").as_bytes())
98            .await?;
99        writer.flush().await?;
100        Ok(())
101    }
102
103    /// Parses an envelope from a body using [`DEFAULT_BOUNDARY`].
104    pub async fn read_from<R>(reader: &mut R) -> Result<Self, ProtocolError>
105    where
106        R: AsyncRead + Unpin,
107    {
108        Self::read_from_with_boundary(reader, DEFAULT_BOUNDARY).await
109    }
110
111    /// Parses an envelope from a body with the supplied multipart boundary.
112    pub async fn read_from_with_boundary<R>(
113        reader: &mut R,
114        boundary: &str,
115    ) -> Result<Self, ProtocolError>
116    where
117        R: AsyncRead + Unpin,
118    {
119        let mut body = Vec::new();
120        reader.read_to_end(&mut body).await?;
121        let body = Bytes::from(body);
122        let stream = stream::once(async move { Ok::<Bytes, std::io::Error>(body) });
123        let mut multipart = multer::Multipart::new(stream, boundary);
124        let mut metadata = None;
125        let mut dump = None;
126        let mut attachments = Vec::new();
127
128        while let Some(field) = multipart.next_field().await? {
129            let name = field
130                .name()
131                .ok_or(ProtocolError::InvalidPartName)?
132                .to_owned();
133            let content_type = field.content_type().map_or_else(
134                || "application/octet-stream".to_owned(),
135                ToString::to_string,
136            );
137            let bytes = field.bytes().await?.to_vec();
138            match name.as_str() {
139                "metadata" => metadata = Some(serde_json::from_slice::<CrashMetadata>(&bytes)?),
140                "dump" => dump = Some(bytes),
141                name if name.starts_with("attach:") => {
142                    let key = name
143                        .strip_prefix("attach:")
144                        .ok_or_else(|| ProtocolError::InvalidMultipart(name.to_owned()))?
145                        .to_owned();
146                    attachments.push(CrashAttachment {
147                        key,
148                        content_type,
149                        bytes,
150                    });
151                }
152                _ => return Err(ProtocolError::InvalidMultipart(name)),
153            }
154        }
155
156        let metadata = metadata.ok_or(ProtocolError::MissingPart("metadata"))?;
157        if metadata.schema_version != PROTOCOL_VERSION {
158            return Err(ProtocolError::InvalidMultipart(format!(
159                "schema version {} does not match protocol version {}",
160                metadata.schema_version, PROTOCOL_VERSION
161            )));
162        }
163        let dump = dump.ok_or(ProtocolError::MissingPart("dump"))?;
164        Ok(Self {
165            metadata,
166            dump,
167            attachments,
168        })
169    }
170}
171
172/// Per-part encoding settings for an entire [`CrashEnvelope`].
173///
174/// `attachments[i]` corresponds to `envelope.attachments[i]`. If the slice is
175/// shorter than the attachment list the remaining attachments are written with
176/// no `Content-Encoding` header.
177#[derive(Debug, Clone, Default)]
178pub struct EnvelopeEncodings {
179    /// Encoding for the `dump` part.
180    pub dump: PartEncoding,
181    /// Encodings for each `attach:<key>` part, in the same order as
182    /// [`CrashEnvelope::attachments`].
183    pub attachments: Vec<PartEncoding>,
184}
185
186impl EnvelopeEncodings {
187    /// Returns an `EnvelopeEncodings` with no `Content-Encoding` set on any part.
188    #[must_use]
189    pub fn none() -> Self {
190        Self::default()
191    }
192}
193
194async fn write_part<W>(
195    writer: &mut W,
196    boundary: &str,
197    name: &str,
198    content_type: &str,
199    content_encoding: Option<&str>,
200    bytes: &[u8],
201) -> Result<(), ProtocolError>
202where
203    W: AsyncWrite + Unpin,
204{
205    writer
206        .write_all(format!("--{boundary}{CRLF}").as_bytes())
207        .await?;
208    let mut headers = format!(
209        "Content-Disposition: form-data; name=\"{name}\"{CRLF}Content-Type: {content_type}{CRLF}"
210    );
211    if let Some(encoding) = content_encoding {
212        let _ = write!(headers, "Content-Encoding: {encoding}{CRLF}");
213    }
214    headers.push_str(CRLF);
215    writer.write_all(headers.as_bytes()).await?;
216    writer.write_all(bytes).await?;
217    writer.write_all(CRLF.as_bytes()).await?;
218    Ok(())
219}