1use std::io::{self, Read};
2
3use flate2::read::DeflateDecoder;
4use flate2::write::DeflateEncoder;
5use flate2::Compression;
6use serde::{Deserialize, Serialize};
7use serde_json::Value;
8
9const MAGIC: &[u8; 4] = b"IRIS";
10const COMPRESSED_FLAG: u8 = 0x01;
11const COMPRESSION_THRESHOLD: usize = 100;
12
13pub const NEARBY_FRAME_HEADER_BYTES: usize = 13;
14pub const NEARBY_MAX_FRAME_BODY_BYTES: usize = 256 * 1024;
15pub const NEARBY_ENVELOPE_VERSION: u8 = 1;
16
17#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
18pub struct NearbyInventoryItem {
19 pub id: String,
20 #[serde(skip_serializing_if = "Option::is_none")]
21 pub author: Option<String>,
22 pub kind: u64,
23 pub created_at: u64,
24 pub size: u64,
25}
26
27#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
28#[serde(tag = "type")]
29pub enum NearbyEnvelope {
30 #[serde(rename = "hello")]
31 Hello {
32 v: u8,
33 #[serde(skip_serializing_if = "Option::is_none")]
34 nonce: Option<String>,
35 #[serde(skip_serializing_if = "Option::is_none")]
36 name: Option<String>,
37 },
38 #[serde(rename = "inv")]
39 Inv {
40 v: u8,
41 id: String,
42 #[serde(skip_serializing_if = "Option::is_none")]
43 author: Option<String>,
44 kind: u64,
45 created_at: u64,
46 size: u64,
47 },
48 #[serde(rename = "want")]
49 Want { v: u8, id: String },
50 #[serde(rename = "event")]
51 Event { v: u8, event_json: String },
52}
53
54impl NearbyEnvelope {
55 pub fn hello(nonce: Option<String>, name: Option<String>) -> Self {
56 Self::Hello {
57 v: NEARBY_ENVELOPE_VERSION,
58 nonce,
59 name,
60 }
61 }
62
63 pub fn inv(item: NearbyInventoryItem) -> Self {
64 Self::Inv {
65 v: NEARBY_ENVELOPE_VERSION,
66 id: item.id,
67 author: item.author,
68 kind: item.kind,
69 created_at: item.created_at,
70 size: item.size,
71 }
72 }
73
74 pub fn want(id: impl Into<String>) -> Self {
75 Self::Want {
76 v: NEARBY_ENVELOPE_VERSION,
77 id: id.into(),
78 }
79 }
80
81 pub fn event(event_json: impl Into<String>) -> Self {
82 Self::Event {
83 v: NEARBY_ENVELOPE_VERSION,
84 event_json: event_json.into(),
85 }
86 }
87
88 fn version(&self) -> u8 {
89 match self {
90 Self::Hello { v, .. }
91 | Self::Inv { v, .. }
92 | Self::Want { v, .. }
93 | Self::Event { v, .. } => *v,
94 }
95 }
96}
97
98pub fn encode_nearby_envelope_json(envelope: &NearbyEnvelope) -> Option<String> {
99 if !validate_nearby_envelope(envelope) {
100 return None;
101 }
102 serde_json::to_string(envelope).ok()
103}
104
105pub fn decode_nearby_envelope_json(envelope_json: &str) -> Option<NearbyEnvelope> {
106 let value: Value = serde_json::from_str(envelope_json).ok()?;
107 if value.get("peer_id").is_some() {
108 return None;
109 }
110 let envelope: NearbyEnvelope = serde_json::from_value(value).ok()?;
111 validate_nearby_envelope(&envelope).then_some(envelope)
112}
113
114pub fn encode_nearby_envelope_frame(envelope: &NearbyEnvelope) -> Option<Vec<u8>> {
115 encode_nearby_frame_json(&encode_nearby_envelope_json(envelope)?)
116}
117
118pub fn decode_nearby_envelope_frame(frame: &[u8]) -> Option<NearbyEnvelope> {
119 decode_nearby_envelope_json(&decode_nearby_frame_json(frame)?)
120}
121
122fn validate_nearby_envelope(envelope: &NearbyEnvelope) -> bool {
123 if envelope.version() != NEARBY_ENVELOPE_VERSION {
124 return false;
125 }
126 match envelope {
127 NearbyEnvelope::Hello { .. } => true,
128 NearbyEnvelope::Inv {
129 id, author, size, ..
130 } => {
131 is_hex_id(id)
132 && author.as_ref().is_none_or(|author| is_hex_id(author))
133 && (1..=NEARBY_MAX_FRAME_BODY_BYTES as u64).contains(size)
134 }
135 NearbyEnvelope::Want { id, .. } => is_hex_id(id),
136 NearbyEnvelope::Event { event_json, .. } => {
137 !event_json.is_empty() && event_json.len() <= NEARBY_MAX_FRAME_BODY_BYTES
138 }
139 }
140}
141
142fn is_hex_id(value: &str) -> bool {
143 value.len() == 64 && value.bytes().all(|byte| byte.is_ascii_hexdigit())
144}
145
146pub fn encode_nearby_frame_json(envelope_json: &str) -> Option<Vec<u8>> {
147 let envelope: Value = serde_json::from_str(envelope_json).ok()?;
148 if !envelope.is_object() {
149 return None;
150 }
151 let payload = serde_json::to_vec(&envelope).ok()?;
152 if payload.is_empty() || payload.len() > NEARBY_MAX_FRAME_BODY_BYTES {
153 return None;
154 }
155
156 let compressed = compress_if_beneficial(&payload);
157 let body = compressed.as_deref().unwrap_or(&payload);
158 if body.len() > NEARBY_MAX_FRAME_BODY_BYTES {
159 return None;
160 }
161
162 let mut frame = Vec::with_capacity(NEARBY_FRAME_HEADER_BYTES + body.len());
163 frame.extend_from_slice(MAGIC);
164 frame.push(if compressed.is_some() {
165 COMPRESSED_FLAG
166 } else {
167 0
168 });
169 frame.extend_from_slice(&(body.len() as u32).to_be_bytes());
170 frame.extend_from_slice(&(payload.len() as u32).to_be_bytes());
171 frame.extend_from_slice(body);
172 Some(frame)
173}
174
175pub fn decode_nearby_frame_json(frame: &[u8]) -> Option<String> {
176 if frame.len() < NEARBY_FRAME_HEADER_BYTES || &frame[..4] != MAGIC {
177 return None;
178 }
179 let flags = frame[4];
180 if flags & !COMPRESSED_FLAG != 0 {
181 return None;
182 }
183
184 let body_len = u32::from_be_bytes(frame[5..9].try_into().ok()?) as usize;
185 let original_len = u32::from_be_bytes(frame[9..13].try_into().ok()?) as usize;
186 if body_len == 0
187 || original_len == 0
188 || body_len > NEARBY_MAX_FRAME_BODY_BYTES
189 || original_len > NEARBY_MAX_FRAME_BODY_BYTES
190 || frame.len() != NEARBY_FRAME_HEADER_BYTES + body_len
191 {
192 return None;
193 }
194
195 let body = &frame[NEARBY_FRAME_HEADER_BYTES..];
196 let payload = if flags & COMPRESSED_FLAG != 0 {
197 decompress(body, original_len)?
198 } else {
199 if body_len != original_len {
200 return None;
201 }
202 body.to_vec()
203 };
204
205 let envelope: Value = serde_json::from_slice(&payload).ok()?;
206 if !envelope.is_object() {
207 return None;
208 }
209 serde_json::to_string(&envelope).ok()
210}
211
212pub fn nearby_frame_body_len_from_header(header: &[u8]) -> Option<usize> {
213 if header.len() < NEARBY_FRAME_HEADER_BYTES || &header[..4] != MAGIC {
214 return None;
215 }
216 let body_len = u32::from_be_bytes(header[5..9].try_into().ok()?) as usize;
217 if body_len == 0 || body_len > NEARBY_MAX_FRAME_BODY_BYTES {
218 return None;
219 }
220 Some(body_len)
221}
222
223pub fn read_nearby_frame<R: Read>(reader: &mut R) -> io::Result<Option<Vec<u8>>> {
224 let mut header = [0u8; NEARBY_FRAME_HEADER_BYTES];
225 if !read_exact_or_eof(reader, &mut header)? {
226 return Ok(None);
227 }
228 let body_len = nearby_frame_body_len_from_header(&header)
229 .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "invalid nearby frame header"))?;
230 let mut frame = Vec::with_capacity(NEARBY_FRAME_HEADER_BYTES + body_len);
231 frame.extend_from_slice(&header);
232 let start = frame.len();
233 frame.resize(start + body_len, 0);
234 reader.read_exact(&mut frame[start..])?;
235 Ok(Some(frame))
236}
237
238#[derive(Debug)]
239pub struct NearbyFrameAssembler {
240 buffer: Vec<u8>,
241}
242
243impl NearbyFrameAssembler {
244 pub fn new() -> Self {
245 Self { buffer: Vec::new() }
246 }
247
248 pub fn append(&mut self, chunk: &[u8]) -> Vec<Vec<u8>> {
249 self.buffer.extend_from_slice(chunk);
250 let mut frames = Vec::new();
251 while self.buffer.len() >= NEARBY_FRAME_HEADER_BYTES {
252 let Some(body_len) =
253 nearby_frame_body_len_from_header(&self.buffer[..NEARBY_FRAME_HEADER_BYTES])
254 else {
255 self.buffer.remove(0);
256 continue;
257 };
258 let frame_len = NEARBY_FRAME_HEADER_BYTES + body_len;
259 if self.buffer.len() < frame_len {
260 break;
261 }
262 frames.push(self.buffer.drain(..frame_len).collect());
263 }
264 frames
265 }
266}
267
268impl Default for NearbyFrameAssembler {
269 fn default() -> Self {
270 Self::new()
271 }
272}
273
274fn read_exact_or_eof<R: Read>(reader: &mut R, buffer: &mut [u8]) -> io::Result<bool> {
275 let mut offset = 0;
276 while offset < buffer.len() {
277 match reader.read(&mut buffer[offset..]) {
278 Ok(0) if offset == 0 => return Ok(false),
279 Ok(0) => {
280 return Err(io::Error::new(
281 io::ErrorKind::UnexpectedEof,
282 "partial nearby frame header",
283 ))
284 }
285 Ok(read) => offset += read,
286 Err(error) if error.kind() == io::ErrorKind::Interrupted => {}
287 Err(error) => return Err(error),
288 }
289 }
290 Ok(true)
291}
292
293fn compress_if_beneficial(data: &[u8]) -> Option<Vec<u8>> {
294 if data.len() < COMPRESSION_THRESHOLD {
295 return None;
296 }
297 let mut encoder = DeflateEncoder::new(Vec::new(), Compression::default());
298 use std::io::Write;
299 encoder.write_all(data).ok()?;
300 let compressed = encoder.finish().ok()?;
301 if compressed.is_empty() || compressed.len() >= data.len() {
302 return None;
303 }
304 Some(compressed)
305}
306
307fn decompress(data: &[u8], original_len: usize) -> Option<Vec<u8>> {
308 let mut decoder = DeflateDecoder::new(data);
309 let mut output = Vec::with_capacity(original_len);
310 decoder.read_to_end(&mut output).ok()?;
311 if output.len() != original_len || output.len() > NEARBY_MAX_FRAME_BODY_BYTES {
312 return None;
313 }
314 Some(output)
315}
316
317#[cfg(test)]
318mod tests {
319 use super::*;
320
321 #[test]
322 fn nearby_frame_round_trips_json() {
323 let frame = encode_nearby_frame_json(r#"{"v":1,"type":"hello"}"#).unwrap();
324 assert_eq!(&frame[..4], b"IRIS");
325 assert_eq!(
326 nearby_frame_body_len_from_header(&frame[..NEARBY_FRAME_HEADER_BYTES]),
327 Some(frame.len() - NEARBY_FRAME_HEADER_BYTES)
328 );
329
330 let decoded = decode_nearby_frame_json(&frame).unwrap();
331 let value: Value = serde_json::from_str(&decoded).unwrap();
332 assert_eq!(value["type"], "hello");
333 assert!(value.get("peer_id").is_none());
334 }
335
336 #[test]
337 fn nearby_typed_envelopes_have_no_peer_id_and_atomic_want() {
338 let id = "a".repeat(64);
339 let author = "b".repeat(64);
340 let inv = NearbyEnvelope::inv(NearbyInventoryItem {
341 id: id.clone(),
342 author: Some(author.clone()),
343 kind: 1059,
344 created_at: 1_700_000_000,
345 size: 512,
346 });
347 let frame = encode_nearby_envelope_frame(&inv).unwrap();
348 let json = decode_nearby_frame_json(&frame).unwrap();
349 let value: Value = serde_json::from_str(&json).unwrap();
350 assert_eq!(value["type"], "inv");
351 assert_eq!(value["id"], id);
352 assert_eq!(value["author"], author);
353 assert!(value.get("peer_id").is_none());
354 assert!(value.get("events").is_none());
355
356 let want = NearbyEnvelope::want(id.clone());
357 let frame = encode_nearby_envelope_frame(&want).unwrap();
358 let decoded = decode_nearby_envelope_frame(&frame).unwrap();
359 assert_eq!(decoded, want);
360 let json = decode_nearby_frame_json(&frame).unwrap();
361 let value: Value = serde_json::from_str(&json).unwrap();
362 assert_eq!(value["type"], "want");
363 assert_eq!(value["id"], id);
364 assert!(value.get("ids").is_none());
365 assert!(value.get("peer_id").is_none());
366 }
367
368 #[test]
369 fn nearby_typed_envelope_rejects_peer_id_field() {
370 assert!(decode_nearby_envelope_json(r#"{"v":1,"type":"hello","peer_id":"abc"}"#).is_none());
371 }
372
373 #[test]
374 fn nearby_frame_rejects_zlib_wrapped_payload() {
375 let payload = br#"{"v":1,"type":"hello"}"#;
376 let mut encoder =
377 flate2::write::ZlibEncoder::new(Vec::new(), flate2::Compression::default());
378 use std::io::Write;
379 encoder.write_all(payload).unwrap();
380 let body = encoder.finish().unwrap();
381
382 let mut frame = Vec::new();
383 frame.extend_from_slice(b"IRIS");
384 frame.push(COMPRESSED_FLAG);
385 frame.extend_from_slice(&(body.len() as u32).to_be_bytes());
386 frame.extend_from_slice(&(payload.len() as u32).to_be_bytes());
387 frame.extend_from_slice(&body);
388
389 assert!(decode_nearby_frame_json(&frame).is_none());
390 }
391}