Skip to main content

iris_chat_protocol/
nearby.rs

1use std::io::{self, Read};
2
3use flate2::read::DeflateDecoder;
4use flate2::write::DeflateEncoder;
5use flate2::Compression;
6use serde::{Deserialize, Serialize};
7use serde_json::Value;
8
9const MAGIC: &[u8; 4] = b"IRIS";
10const COMPRESSED_FLAG: u8 = 0x01;
11const COMPRESSION_THRESHOLD: usize = 100;
12
13pub const NEARBY_FRAME_HEADER_BYTES: usize = 13;
14pub const NEARBY_MAX_FRAME_BODY_BYTES: usize = 256 * 1024;
15pub const NEARBY_ENVELOPE_VERSION: u8 = 1;
16
17#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
18pub struct NearbyInventoryItem {
19    pub id: String,
20    #[serde(skip_serializing_if = "Option::is_none")]
21    pub author: Option<String>,
22    pub kind: u64,
23    pub created_at: u64,
24    pub size: u64,
25}
26
27#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
28#[serde(tag = "type")]
29pub enum NearbyEnvelope {
30    #[serde(rename = "hello")]
31    Hello {
32        v: u8,
33        #[serde(skip_serializing_if = "Option::is_none")]
34        nonce: Option<String>,
35        #[serde(skip_serializing_if = "Option::is_none")]
36        name: Option<String>,
37    },
38    #[serde(rename = "inv")]
39    Inv {
40        v: u8,
41        id: String,
42        #[serde(skip_serializing_if = "Option::is_none")]
43        author: Option<String>,
44        kind: u64,
45        created_at: u64,
46        size: u64,
47    },
48    #[serde(rename = "want")]
49    Want { v: u8, id: String },
50    #[serde(rename = "event")]
51    Event { v: u8, event_json: String },
52}
53
54impl NearbyEnvelope {
55    pub fn hello(nonce: Option<String>, name: Option<String>) -> Self {
56        Self::Hello {
57            v: NEARBY_ENVELOPE_VERSION,
58            nonce,
59            name,
60        }
61    }
62
63    pub fn inv(item: NearbyInventoryItem) -> Self {
64        Self::Inv {
65            v: NEARBY_ENVELOPE_VERSION,
66            id: item.id,
67            author: item.author,
68            kind: item.kind,
69            created_at: item.created_at,
70            size: item.size,
71        }
72    }
73
74    pub fn want(id: impl Into<String>) -> Self {
75        Self::Want {
76            v: NEARBY_ENVELOPE_VERSION,
77            id: id.into(),
78        }
79    }
80
81    pub fn event(event_json: impl Into<String>) -> Self {
82        Self::Event {
83            v: NEARBY_ENVELOPE_VERSION,
84            event_json: event_json.into(),
85        }
86    }
87
88    fn version(&self) -> u8 {
89        match self {
90            Self::Hello { v, .. }
91            | Self::Inv { v, .. }
92            | Self::Want { v, .. }
93            | Self::Event { v, .. } => *v,
94        }
95    }
96}
97
98pub fn encode_nearby_envelope_json(envelope: &NearbyEnvelope) -> Option<String> {
99    if !validate_nearby_envelope(envelope) {
100        return None;
101    }
102    serde_json::to_string(envelope).ok()
103}
104
105pub fn decode_nearby_envelope_json(envelope_json: &str) -> Option<NearbyEnvelope> {
106    let value: Value = serde_json::from_str(envelope_json).ok()?;
107    if value.get("peer_id").is_some() {
108        return None;
109    }
110    let envelope: NearbyEnvelope = serde_json::from_value(value).ok()?;
111    validate_nearby_envelope(&envelope).then_some(envelope)
112}
113
114pub fn encode_nearby_envelope_frame(envelope: &NearbyEnvelope) -> Option<Vec<u8>> {
115    encode_nearby_frame_json(&encode_nearby_envelope_json(envelope)?)
116}
117
118pub fn decode_nearby_envelope_frame(frame: &[u8]) -> Option<NearbyEnvelope> {
119    decode_nearby_envelope_json(&decode_nearby_frame_json(frame)?)
120}
121
122fn validate_nearby_envelope(envelope: &NearbyEnvelope) -> bool {
123    if envelope.version() != NEARBY_ENVELOPE_VERSION {
124        return false;
125    }
126    match envelope {
127        NearbyEnvelope::Hello { .. } => true,
128        NearbyEnvelope::Inv {
129            id, author, size, ..
130        } => {
131            is_hex_id(id)
132                && author.as_ref().is_none_or(|author| is_hex_id(author))
133                && (1..=NEARBY_MAX_FRAME_BODY_BYTES as u64).contains(size)
134        }
135        NearbyEnvelope::Want { id, .. } => is_hex_id(id),
136        NearbyEnvelope::Event { event_json, .. } => {
137            !event_json.is_empty() && event_json.len() <= NEARBY_MAX_FRAME_BODY_BYTES
138        }
139    }
140}
141
142fn is_hex_id(value: &str) -> bool {
143    value.len() == 64 && value.bytes().all(|byte| byte.is_ascii_hexdigit())
144}
145
146pub fn encode_nearby_frame_json(envelope_json: &str) -> Option<Vec<u8>> {
147    let envelope: Value = serde_json::from_str(envelope_json).ok()?;
148    if !envelope.is_object() {
149        return None;
150    }
151    let payload = serde_json::to_vec(&envelope).ok()?;
152    if payload.is_empty() || payload.len() > NEARBY_MAX_FRAME_BODY_BYTES {
153        return None;
154    }
155
156    let compressed = compress_if_beneficial(&payload);
157    let body = compressed.as_deref().unwrap_or(&payload);
158    if body.len() > NEARBY_MAX_FRAME_BODY_BYTES {
159        return None;
160    }
161
162    let mut frame = Vec::with_capacity(NEARBY_FRAME_HEADER_BYTES + body.len());
163    frame.extend_from_slice(MAGIC);
164    frame.push(if compressed.is_some() {
165        COMPRESSED_FLAG
166    } else {
167        0
168    });
169    frame.extend_from_slice(&(body.len() as u32).to_be_bytes());
170    frame.extend_from_slice(&(payload.len() as u32).to_be_bytes());
171    frame.extend_from_slice(body);
172    Some(frame)
173}
174
175pub fn decode_nearby_frame_json(frame: &[u8]) -> Option<String> {
176    if frame.len() < NEARBY_FRAME_HEADER_BYTES || &frame[..4] != MAGIC {
177        return None;
178    }
179    let flags = frame[4];
180    if flags & !COMPRESSED_FLAG != 0 {
181        return None;
182    }
183
184    let body_len = u32::from_be_bytes(frame[5..9].try_into().ok()?) as usize;
185    let original_len = u32::from_be_bytes(frame[9..13].try_into().ok()?) as usize;
186    if body_len == 0
187        || original_len == 0
188        || body_len > NEARBY_MAX_FRAME_BODY_BYTES
189        || original_len > NEARBY_MAX_FRAME_BODY_BYTES
190        || frame.len() != NEARBY_FRAME_HEADER_BYTES + body_len
191    {
192        return None;
193    }
194
195    let body = &frame[NEARBY_FRAME_HEADER_BYTES..];
196    let payload = if flags & COMPRESSED_FLAG != 0 {
197        decompress(body, original_len)?
198    } else {
199        if body_len != original_len {
200            return None;
201        }
202        body.to_vec()
203    };
204
205    let envelope: Value = serde_json::from_slice(&payload).ok()?;
206    if !envelope.is_object() {
207        return None;
208    }
209    serde_json::to_string(&envelope).ok()
210}
211
212pub fn nearby_frame_body_len_from_header(header: &[u8]) -> Option<usize> {
213    if header.len() < NEARBY_FRAME_HEADER_BYTES || &header[..4] != MAGIC {
214        return None;
215    }
216    let body_len = u32::from_be_bytes(header[5..9].try_into().ok()?) as usize;
217    if body_len == 0 || body_len > NEARBY_MAX_FRAME_BODY_BYTES {
218        return None;
219    }
220    Some(body_len)
221}
222
223pub fn read_nearby_frame<R: Read>(reader: &mut R) -> io::Result<Option<Vec<u8>>> {
224    let mut header = [0u8; NEARBY_FRAME_HEADER_BYTES];
225    if !read_exact_or_eof(reader, &mut header)? {
226        return Ok(None);
227    }
228    let body_len = nearby_frame_body_len_from_header(&header)
229        .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "invalid nearby frame header"))?;
230    let mut frame = Vec::with_capacity(NEARBY_FRAME_HEADER_BYTES + body_len);
231    frame.extend_from_slice(&header);
232    let start = frame.len();
233    frame.resize(start + body_len, 0);
234    reader.read_exact(&mut frame[start..])?;
235    Ok(Some(frame))
236}
237
238#[derive(Debug)]
239pub struct NearbyFrameAssembler {
240    buffer: Vec<u8>,
241}
242
243impl NearbyFrameAssembler {
244    pub fn new() -> Self {
245        Self { buffer: Vec::new() }
246    }
247
248    pub fn append(&mut self, chunk: &[u8]) -> Vec<Vec<u8>> {
249        self.buffer.extend_from_slice(chunk);
250        let mut frames = Vec::new();
251        while self.buffer.len() >= NEARBY_FRAME_HEADER_BYTES {
252            let Some(body_len) =
253                nearby_frame_body_len_from_header(&self.buffer[..NEARBY_FRAME_HEADER_BYTES])
254            else {
255                self.buffer.remove(0);
256                continue;
257            };
258            let frame_len = NEARBY_FRAME_HEADER_BYTES + body_len;
259            if self.buffer.len() < frame_len {
260                break;
261            }
262            frames.push(self.buffer.drain(..frame_len).collect());
263        }
264        frames
265    }
266}
267
268impl Default for NearbyFrameAssembler {
269    fn default() -> Self {
270        Self::new()
271    }
272}
273
274fn read_exact_or_eof<R: Read>(reader: &mut R, buffer: &mut [u8]) -> io::Result<bool> {
275    let mut offset = 0;
276    while offset < buffer.len() {
277        match reader.read(&mut buffer[offset..]) {
278            Ok(0) if offset == 0 => return Ok(false),
279            Ok(0) => {
280                return Err(io::Error::new(
281                    io::ErrorKind::UnexpectedEof,
282                    "partial nearby frame header",
283                ))
284            }
285            Ok(read) => offset += read,
286            Err(error) if error.kind() == io::ErrorKind::Interrupted => {}
287            Err(error) => return Err(error),
288        }
289    }
290    Ok(true)
291}
292
293fn compress_if_beneficial(data: &[u8]) -> Option<Vec<u8>> {
294    if data.len() < COMPRESSION_THRESHOLD {
295        return None;
296    }
297    let mut encoder = DeflateEncoder::new(Vec::new(), Compression::default());
298    use std::io::Write;
299    encoder.write_all(data).ok()?;
300    let compressed = encoder.finish().ok()?;
301    if compressed.is_empty() || compressed.len() >= data.len() {
302        return None;
303    }
304    Some(compressed)
305}
306
307fn decompress(data: &[u8], original_len: usize) -> Option<Vec<u8>> {
308    let mut decoder = DeflateDecoder::new(data);
309    let mut output = Vec::with_capacity(original_len);
310    decoder.read_to_end(&mut output).ok()?;
311    if output.len() != original_len || output.len() > NEARBY_MAX_FRAME_BODY_BYTES {
312        return None;
313    }
314    Some(output)
315}
316
317#[cfg(test)]
318mod tests {
319    use super::*;
320
321    #[test]
322    fn nearby_frame_round_trips_json() {
323        let frame = encode_nearby_frame_json(r#"{"v":1,"type":"hello"}"#).unwrap();
324        assert_eq!(&frame[..4], b"IRIS");
325        assert_eq!(
326            nearby_frame_body_len_from_header(&frame[..NEARBY_FRAME_HEADER_BYTES]),
327            Some(frame.len() - NEARBY_FRAME_HEADER_BYTES)
328        );
329
330        let decoded = decode_nearby_frame_json(&frame).unwrap();
331        let value: Value = serde_json::from_str(&decoded).unwrap();
332        assert_eq!(value["type"], "hello");
333        assert!(value.get("peer_id").is_none());
334    }
335
336    #[test]
337    fn nearby_typed_envelopes_have_no_peer_id_and_atomic_want() {
338        let id = "a".repeat(64);
339        let author = "b".repeat(64);
340        let inv = NearbyEnvelope::inv(NearbyInventoryItem {
341            id: id.clone(),
342            author: Some(author.clone()),
343            kind: 1059,
344            created_at: 1_700_000_000,
345            size: 512,
346        });
347        let frame = encode_nearby_envelope_frame(&inv).unwrap();
348        let json = decode_nearby_frame_json(&frame).unwrap();
349        let value: Value = serde_json::from_str(&json).unwrap();
350        assert_eq!(value["type"], "inv");
351        assert_eq!(value["id"], id);
352        assert_eq!(value["author"], author);
353        assert!(value.get("peer_id").is_none());
354        assert!(value.get("events").is_none());
355
356        let want = NearbyEnvelope::want(id.clone());
357        let frame = encode_nearby_envelope_frame(&want).unwrap();
358        let decoded = decode_nearby_envelope_frame(&frame).unwrap();
359        assert_eq!(decoded, want);
360        let json = decode_nearby_frame_json(&frame).unwrap();
361        let value: Value = serde_json::from_str(&json).unwrap();
362        assert_eq!(value["type"], "want");
363        assert_eq!(value["id"], id);
364        assert!(value.get("ids").is_none());
365        assert!(value.get("peer_id").is_none());
366    }
367
368    #[test]
369    fn nearby_typed_envelope_rejects_peer_id_field() {
370        assert!(decode_nearby_envelope_json(r#"{"v":1,"type":"hello","peer_id":"abc"}"#).is_none());
371    }
372
373    #[test]
374    fn nearby_frame_rejects_zlib_wrapped_payload() {
375        let payload = br#"{"v":1,"type":"hello"}"#;
376        let mut encoder =
377            flate2::write::ZlibEncoder::new(Vec::new(), flate2::Compression::default());
378        use std::io::Write;
379        encoder.write_all(payload).unwrap();
380        let body = encoder.finish().unwrap();
381
382        let mut frame = Vec::new();
383        frame.extend_from_slice(b"IRIS");
384        frame.push(COMPRESSED_FLAG);
385        frame.extend_from_slice(&(body.len() as u32).to_be_bytes());
386        frame.extend_from_slice(&(payload.len() as u32).to_be_bytes());
387        frame.extend_from_slice(&body);
388
389        assert!(decode_nearby_frame_json(&frame).is_none());
390    }
391}