detritus_protocol/
multipart.rs1use std::fmt::Write as _;
4
5use bytes::Bytes;
6use futures_util::stream;
7use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
8
9use crate::{
10 PROTOCOL_VERSION,
11 crash::{CrashAttachment, CrashEnvelope, CrashMetadata, ProtocolError},
12};
13
14pub const DEFAULT_BOUNDARY: &str = "detritus-boundary-v1";
16const CRLF: &str = "\r\n";
17
18#[derive(Debug, Clone, Default)]
20pub struct PartEncoding {
21 pub content_encoding: Option<String>,
25}
26
27impl CrashEnvelope {
28 pub async fn write_to<W>(&self, writer: &mut W) -> Result<(), ProtocolError>
30 where
31 W: AsyncWrite + Unpin,
32 {
33 self.write_to_with_boundary(writer, DEFAULT_BOUNDARY).await
34 }
35
36 pub async fn write_to_with_boundary<W>(
38 &self,
39 writer: &mut W,
40 boundary: &str,
41 ) -> Result<(), ProtocolError>
42 where
43 W: AsyncWrite + Unpin,
44 {
45 let encodings = EnvelopeEncodings::none();
46 self.write_to_with_boundary_and_encodings(writer, boundary, &encodings)
47 .await
48 }
49
50 pub async fn write_to_with_boundary_and_encodings<W>(
53 &self,
54 writer: &mut W,
55 boundary: &str,
56 encodings: &EnvelopeEncodings,
57 ) -> Result<(), ProtocolError>
58 where
59 W: AsyncWrite + Unpin,
60 {
61 let metadata = serde_json::to_vec(&self.metadata)?;
62 write_part(
63 writer,
64 boundary,
65 "metadata",
66 "application/json",
67 None,
68 &metadata,
69 )
70 .await?;
71 write_part(
72 writer,
73 boundary,
74 "dump",
75 "application/octet-stream",
76 encodings.dump.content_encoding.as_deref(),
77 &self.dump,
78 )
79 .await?;
80 for (idx, attachment) in self.attachments.iter().enumerate() {
81 let name = format!("attach:{}", attachment.key);
82 let encoding = encodings
83 .attachments
84 .get(idx)
85 .and_then(|e| e.content_encoding.as_deref());
86 write_part(
87 writer,
88 boundary,
89 &name,
90 &attachment.content_type,
91 encoding,
92 &attachment.bytes,
93 )
94 .await?;
95 }
96 writer
97 .write_all(format!("--{boundary}--{CRLF}").as_bytes())
98 .await?;
99 writer.flush().await?;
100 Ok(())
101 }
102
103 pub async fn read_from<R>(reader: &mut R) -> Result<Self, ProtocolError>
105 where
106 R: AsyncRead + Unpin,
107 {
108 Self::read_from_with_boundary(reader, DEFAULT_BOUNDARY).await
109 }
110
111 pub async fn read_from_with_boundary<R>(
113 reader: &mut R,
114 boundary: &str,
115 ) -> Result<Self, ProtocolError>
116 where
117 R: AsyncRead + Unpin,
118 {
119 let mut body = Vec::new();
120 reader.read_to_end(&mut body).await?;
121 let body = Bytes::from(body);
122 let stream = stream::once(async move { Ok::<Bytes, std::io::Error>(body) });
123 let mut multipart = multer::Multipart::new(stream, boundary);
124 let mut metadata = None;
125 let mut dump = None;
126 let mut attachments = Vec::new();
127
128 while let Some(field) = multipart.next_field().await? {
129 let name = field
130 .name()
131 .ok_or(ProtocolError::InvalidPartName)?
132 .to_owned();
133 let content_type = field.content_type().map_or_else(
134 || "application/octet-stream".to_owned(),
135 ToString::to_string,
136 );
137 let bytes = field.bytes().await?.to_vec();
138 match name.as_str() {
139 "metadata" => metadata = Some(serde_json::from_slice::<CrashMetadata>(&bytes)?),
140 "dump" => dump = Some(bytes),
141 name if name.starts_with("attach:") => {
142 let key = name
143 .strip_prefix("attach:")
144 .ok_or_else(|| ProtocolError::InvalidMultipart(name.to_owned()))?
145 .to_owned();
146 attachments.push(CrashAttachment {
147 key,
148 content_type,
149 bytes,
150 });
151 }
152 _ => return Err(ProtocolError::InvalidMultipart(name)),
153 }
154 }
155
156 let metadata = metadata.ok_or(ProtocolError::MissingPart("metadata"))?;
157 if metadata.schema_version != PROTOCOL_VERSION {
158 return Err(ProtocolError::InvalidMultipart(format!(
159 "schema version {} does not match protocol version {}",
160 metadata.schema_version, PROTOCOL_VERSION
161 )));
162 }
163 let dump = dump.ok_or(ProtocolError::MissingPart("dump"))?;
164 Ok(Self {
165 metadata,
166 dump,
167 attachments,
168 })
169 }
170}
171
172#[derive(Debug, Clone, Default)]
178pub struct EnvelopeEncodings {
179 pub dump: PartEncoding,
181 pub attachments: Vec<PartEncoding>,
184}
185
186impl EnvelopeEncodings {
187 #[must_use]
189 pub fn none() -> Self {
190 Self::default()
191 }
192}
193
194async fn write_part<W>(
195 writer: &mut W,
196 boundary: &str,
197 name: &str,
198 content_type: &str,
199 content_encoding: Option<&str>,
200 bytes: &[u8],
201) -> Result<(), ProtocolError>
202where
203 W: AsyncWrite + Unpin,
204{
205 writer
206 .write_all(format!("--{boundary}{CRLF}").as_bytes())
207 .await?;
208 let mut headers = format!(
209 "Content-Disposition: form-data; name=\"{name}\"{CRLF}Content-Type: {content_type}{CRLF}"
210 );
211 if let Some(encoding) = content_encoding {
212 let _ = write!(headers, "Content-Encoding: {encoding}{CRLF}");
213 }
214 headers.push_str(CRLF);
215 writer.write_all(headers.as_bytes()).await?;
216 writer.write_all(bytes).await?;
217 writer.write_all(CRLF.as_bytes()).await?;
218 Ok(())
219}