polysig_protocol/encoding/v1/
mod.rs

1use async_trait::async_trait;
2use binary_stream::futures::{
3    BinaryReader, BinaryWriter, Decodable, Encodable,
4};
5use futures::io::{AsyncRead, AsyncSeek, AsyncWrite};
6use std::io::Result;
7
8use crate::{
9    encoding::{
10        decode_preamble, encode_preamble, encoding_error, types,
11        MAX_BUFFER_SIZE,
12    },
13    Chunk, Encoding, Error, HandshakeMessage, OpaqueMessage,
14    RequestMessage, ResponseMessage, SealedEnvelope, ServerMessage,
15    SessionId, SessionRequest, SessionState, TransparentMessage,
16};
17
18/// Version for binary encoding.
19pub const VERSION: u16 = 1;
20
21/// Encode a length-prefixed buffer.
22async fn encode_buffer<W: AsyncWrite + AsyncSeek + Unpin + Send>(
23    writer: &mut BinaryWriter<W>,
24    buffer: &[u8],
25) -> Result<()> {
26    if buffer.len() > MAX_BUFFER_SIZE {
27        return Err(encoding_error(Error::MaxBufferSize(
28            MAX_BUFFER_SIZE,
29        )));
30    }
31    writer.write_u16(buffer.len() as u16).await?;
32    writer.write_bytes(buffer).await?;
33    Ok(())
34}
35
36/// Decode a length-prefixed buffer.
37async fn decode_buffer<R: AsyncRead + AsyncSeek + Unpin + Send>(
38    reader: &mut BinaryReader<R>,
39) -> Result<Vec<u8>> {
40    let size = reader.read_u16().await?;
41    let buf = reader.read_bytes(size as usize).await?;
42    Ok(buf)
43}
44
45/// Encode an encrypted payload with an additional length prefix
46/// indicating the length of the encrypted buffer.
47async fn encode_payload<W: AsyncWrite + AsyncSeek + Unpin + Send>(
48    writer: &mut BinaryWriter<W>,
49    length: &usize,
50    buffer: &[u8],
51) -> Result<()> {
52    if *length > MAX_BUFFER_SIZE {
53        return Err(encoding_error(Error::MaxBufferSize(
54            MAX_BUFFER_SIZE,
55        )));
56    }
57    writer.write_u16(*length as u16).await?;
58    encode_buffer(writer, buffer).await?;
59    Ok(())
60}
61
62/// Decode an encrypted payload with an additional length prefix
63/// indicating the length of the encrypted buffer.
64async fn decode_payload<R: AsyncRead + AsyncSeek + Unpin + Send>(
65    reader: &mut BinaryReader<R>,
66) -> Result<(usize, Vec<u8>)> {
67    let length = reader.read_u16().await? as usize;
68    let buffer = decode_buffer(reader).await?;
69    Ok((length, buffer))
70}
71
72//#[cfg_attr(target_arch="wasm32", async_trait(?Send))]
73//#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
74#[async_trait]
75impl Encodable for HandshakeMessage {
76    async fn encode<W: AsyncWrite + AsyncSeek + Unpin + Send>(
77        &self,
78        writer: &mut BinaryWriter<W>,
79    ) -> Result<()> {
80        let id: u8 = self.into();
81        writer.write_u8(id).await?;
82        match self {
83            Self::Initiator(len, buf) => {
84                encode_payload(writer, len, buf).await?;
85            }
86            Self::Responder(len, buf) => {
87                encode_payload(writer, len, buf).await?;
88            }
89            Self::Noop => unreachable!(),
90        }
91        Ok(())
92    }
93}
94
95//#[cfg_attr(target_arch="wasm32", async_trait(?Send))]
96//#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
97
98#[async_trait]
99impl Decodable for HandshakeMessage {
100    async fn decode<R: AsyncRead + AsyncSeek + Unpin + Send>(
101        &mut self,
102        reader: &mut BinaryReader<R>,
103    ) -> Result<()> {
104        let id = reader.read_u8().await?;
105        match id {
106            types::HANDSHAKE_INITIATOR => {
107                let (len, buf) = decode_payload(reader).await?;
108                *self = HandshakeMessage::Initiator(len, buf);
109            }
110            types::HANDSHAKE_RESPONDER => {
111                let (len, buf) = decode_payload(reader).await?;
112                *self = HandshakeMessage::Responder(len, buf);
113            }
114            _ => {
115                return Err(encoding_error(
116                    crate::Error::EncodingKind(id),
117                ))
118            }
119        }
120        Ok(())
121    }
122}
123
124//#[cfg_attr(target_arch="wasm32", async_trait(?Send))]
125//#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
126
127#[async_trait]
128impl Encodable for TransparentMessage {
129    async fn encode<W: AsyncWrite + AsyncSeek + Unpin + Send>(
130        &self,
131        writer: &mut BinaryWriter<W>,
132    ) -> Result<()> {
133        let id: u8 = self.into();
134        writer.write_u8(id).await?;
135        match self {
136            Self::Error(code, message) => {
137                let code: u16 = (*code).into();
138                writer.write_u16(code).await?;
139                writer.write_string(message).await?;
140            }
141            Self::ServerHandshake(message) => {
142                message.encode(writer).await?;
143            }
144            Self::PeerHandshake {
145                public_key,
146                message,
147            } => {
148                encode_buffer(writer, public_key).await?;
149                message.encode(writer).await?;
150            }
151            Self::Noop => unreachable!(),
152        }
153        Ok(())
154    }
155}
156
157//#[cfg_attr(target_arch="wasm32", async_trait(?Send))]
158//#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
159
160#[async_trait]
161impl Decodable for TransparentMessage {
162    async fn decode<R: AsyncRead + AsyncSeek + Unpin + Send>(
163        &mut self,
164        reader: &mut BinaryReader<R>,
165    ) -> Result<()> {
166        let id = reader.read_u8().await?;
167        match id {
168            types::ERROR => {
169                let code = reader
170                    .read_u16()
171                    .await?
172                    .try_into()
173                    .map_err(encoding_error)?;
174                let message = reader.read_string().await?;
175                *self = TransparentMessage::Error(code, message);
176            }
177            types::HANDSHAKE_SERVER => {
178                let mut message: HandshakeMessage =
179                    Default::default();
180                message.decode(reader).await?;
181                *self = TransparentMessage::ServerHandshake(message);
182            }
183            types::HANDSHAKE_PEER => {
184                let public_key = decode_buffer(reader).await?;
185                let mut message: HandshakeMessage =
186                    Default::default();
187                message.decode(reader).await?;
188                *self = TransparentMessage::PeerHandshake {
189                    public_key,
190                    message,
191                };
192            }
193            _ => {
194                return Err(encoding_error(
195                    crate::Error::EncodingKind(id),
196                ))
197            }
198        }
199        Ok(())
200    }
201}
202
203//#[cfg_attr(target_arch="wasm32", async_trait(?Send))]
204//#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
205
206#[async_trait]
207impl Encodable for ServerMessage {
208    async fn encode<W: AsyncWrite + AsyncSeek + Unpin + Send>(
209        &self,
210        writer: &mut BinaryWriter<W>,
211    ) -> Result<()> {
212        let id: u8 = self.into();
213        writer.write_u8(id).await?;
214        match self {
215            Self::Error(code, message) => {
216                let code: u16 = (*code).into();
217                writer.write_u16(code).await?;
218                writer.write_string(message).await?;
219            }
220            Self::NewSession(request) => {
221                request.encode(writer).await?;
222            }
223            Self::SessionConnection {
224                session_id,
225                peer_key,
226            } => {
227                writer.write_bytes(session_id.as_bytes()).await?;
228                encode_buffer(writer, peer_key).await?;
229            }
230            Self::SessionCreated(response) => {
231                response.encode(writer).await?;
232            }
233            Self::SessionReady(response) => {
234                response.encode(writer).await?;
235            }
236            Self::SessionActive(response) => {
237                response.encode(writer).await?;
238            }
239            Self::SessionTimeout(session_id) => {
240                writer.write_bytes(session_id.as_bytes()).await?;
241            }
242            Self::CloseSession(session_id) => {
243                writer.write_bytes(session_id.as_bytes()).await?;
244            }
245            Self::SessionFinished(session_id) => {
246                writer.write_bytes(session_id.as_bytes()).await?;
247            }
248            Self::Noop => unreachable!(),
249        }
250        Ok(())
251    }
252}
253
254//#[cfg_attr(target_arch="wasm32", async_trait(?Send))]
255//#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
256
257#[async_trait]
258impl Decodable for ServerMessage {
259    async fn decode<R: AsyncRead + AsyncSeek + Unpin + Send>(
260        &mut self,
261        reader: &mut BinaryReader<R>,
262    ) -> Result<()> {
263        let id = reader.read_u8().await?;
264        match id {
265            types::ERROR => {
266                let code = reader
267                    .read_u16()
268                    .await?
269                    .try_into()
270                    .map_err(encoding_error)?;
271                let message = reader.read_string().await?;
272                *self = ServerMessage::Error(code, message);
273            }
274            types::SESSION_NEW => {
275                let mut session: SessionRequest = Default::default();
276                session.decode(reader).await?;
277                *self = ServerMessage::NewSession(session);
278            }
279            types::SESSION_CONNECTION => {
280                let session_id = SessionId::from_bytes(
281                    reader
282                        .read_bytes(16)
283                        .await?
284                        .as_slice()
285                        .try_into()
286                        .map_err(encoding_error)?,
287                );
288                let peer_key = decode_buffer(reader).await?;
289
290                *self = ServerMessage::SessionConnection {
291                    session_id,
292                    peer_key,
293                };
294            }
295            types::SESSION_CREATED => {
296                let mut session: SessionState = Default::default();
297                session.decode(reader).await?;
298                *self = ServerMessage::SessionCreated(session);
299            }
300            types::SESSION_READY => {
301                let mut session: SessionState = Default::default();
302                session.decode(reader).await?;
303                *self = ServerMessage::SessionReady(session);
304            }
305            types::SESSION_ACTIVE => {
306                let mut session: SessionState = Default::default();
307                session.decode(reader).await?;
308                *self = ServerMessage::SessionActive(session);
309            }
310            types::SESSION_TIMEOUT => {
311                let session_id = SessionId::from_bytes(
312                    reader
313                        .read_bytes(16)
314                        .await?
315                        .as_slice()
316                        .try_into()
317                        .map_err(encoding_error)?,
318                );
319                *self = ServerMessage::SessionTimeout(session_id);
320            }
321            types::SESSION_CLOSE => {
322                let session_id = SessionId::from_bytes(
323                    reader
324                        .read_bytes(16)
325                        .await?
326                        .as_slice()
327                        .try_into()
328                        .map_err(encoding_error)?,
329                );
330                *self = ServerMessage::CloseSession(session_id);
331            }
332            types::SESSION_FINISHED => {
333                let session_id = SessionId::from_bytes(
334                    reader
335                        .read_bytes(16)
336                        .await?
337                        .as_slice()
338                        .try_into()
339                        .map_err(encoding_error)?,
340                );
341                *self = ServerMessage::SessionFinished(session_id);
342            }
343            _ => {
344                return Err(encoding_error(
345                    crate::Error::EncodingKind(id),
346                ))
347            }
348        }
349        Ok(())
350    }
351}
352
353//#[cfg_attr(target_arch="wasm32", async_trait(?Send))]
354//#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
355
356#[async_trait]
357impl Encodable for OpaqueMessage {
358    async fn encode<W: AsyncWrite + AsyncSeek + Unpin + Send>(
359        &self,
360        writer: &mut BinaryWriter<W>,
361    ) -> Result<()> {
362        let id: u8 = self.into();
363        writer.write_u8(id).await?;
364        match self {
365            Self::ServerMessage(envelope) => {
366                envelope.encode(writer).await?;
367            }
368            Self::PeerMessage {
369                public_key,
370                session_id,
371                envelope,
372            } => {
373                encode_buffer(writer, public_key).await?;
374                writer.write_bool(session_id.is_some()).await?;
375                if let Some(id) = session_id {
376                    writer.write_bytes(id.as_bytes()).await?;
377                }
378                envelope.encode(writer).await?;
379            }
380            Self::Noop => unreachable!(),
381        }
382        Ok(())
383    }
384}
385
386//#[cfg_attr(target_arch="wasm32", async_trait(?Send))]
387//#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
388
389#[async_trait]
390impl Decodable for OpaqueMessage {
391    async fn decode<R: AsyncRead + AsyncSeek + Unpin + Send>(
392        &mut self,
393        reader: &mut BinaryReader<R>,
394    ) -> Result<()> {
395        let id = reader.read_u8().await?;
396        match id {
397            types::OPAQUE_SERVER => {
398                let mut envelope: SealedEnvelope = Default::default();
399                envelope.decode(reader).await?;
400                *self = OpaqueMessage::ServerMessage(envelope);
401            }
402            types::OPAQUE_PEER => {
403                let public_key = decode_buffer(reader).await?;
404                let has_session_id = reader.read_bool().await?;
405                let session_id = if has_session_id {
406                    let session_id = SessionId::from_bytes(
407                        reader
408                            .read_bytes(16)
409                            .await?
410                            .as_slice()
411                            .try_into()
412                            .map_err(encoding_error)?,
413                    );
414                    Some(session_id)
415                } else {
416                    None
417                };
418
419                let mut envelope: SealedEnvelope = Default::default();
420                envelope.decode(reader).await?;
421
422                *self = OpaqueMessage::PeerMessage {
423                    public_key,
424                    session_id,
425                    envelope,
426                };
427            }
428            _ => {
429                return Err(encoding_error(
430                    crate::Error::EncodingKind(id),
431                ))
432            }
433        }
434        Ok(())
435    }
436}
437
438//#[cfg_attr(target_arch="wasm32", async_trait(?Send))]
439//#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
440
441#[async_trait]
442impl Encodable for RequestMessage {
443    async fn encode<W: AsyncWrite + AsyncSeek + Unpin + Send>(
444        &self,
445        writer: &mut BinaryWriter<W>,
446    ) -> Result<()> {
447        encode_preamble(writer).await?;
448        let id: u8 = self.into();
449        writer.write_u8(id).await?;
450        match self {
451            Self::Transparent(message) => {
452                message.encode(writer).await?;
453            }
454            Self::Opaque(message) => {
455                message.encode(writer).await?;
456            }
457            Self::Noop => unreachable!(),
458        }
459        Ok(())
460    }
461}
462
463//#[cfg_attr(target_arch="wasm32", async_trait(?Send))]
464//#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
465
466#[async_trait]
467impl Decodable for RequestMessage {
468    async fn decode<R: AsyncRead + AsyncSeek + Unpin + Send>(
469        &mut self,
470        reader: &mut BinaryReader<R>,
471    ) -> Result<()> {
472        decode_preamble(reader).await?;
473        let id = reader.read_u8().await?;
474        match id {
475            types::TRANSPARENT => {
476                let mut message: TransparentMessage =
477                    Default::default();
478                message.decode(reader).await?;
479                *self = RequestMessage::Transparent(message);
480            }
481            types::OPAQUE => {
482                let mut message: OpaqueMessage = Default::default();
483                message.decode(reader).await?;
484                *self = RequestMessage::Opaque(message);
485            }
486            _ => {
487                return Err(encoding_error(
488                    crate::Error::EncodingKind(id),
489                ))
490            }
491        }
492        Ok(())
493    }
494}
495
496//#[cfg_attr(target_arch="wasm32", async_trait(?Send))]
497//#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
498
499#[async_trait]
500impl Encodable for ResponseMessage {
501    async fn encode<W: AsyncWrite + AsyncSeek + Unpin + Send>(
502        &self,
503        writer: &mut BinaryWriter<W>,
504    ) -> Result<()> {
505        encode_preamble(writer).await?;
506        let id: u8 = self.into();
507        writer.write_u8(id).await?;
508        match self {
509            Self::Transparent(message) => {
510                message.encode(&mut *writer).await?;
511            }
512            Self::Opaque(message) => {
513                message.encode(&mut *writer).await?;
514            }
515            Self::Noop => unreachable!(),
516        }
517        Ok(())
518    }
519}
520
521//#[cfg_attr(target_arch="wasm32", async_trait(?Send))]
522//#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
523
524#[async_trait]
525impl Decodable for ResponseMessage {
526    async fn decode<R: AsyncRead + AsyncSeek + Unpin + Send>(
527        &mut self,
528        reader: &mut BinaryReader<R>,
529    ) -> Result<()> {
530        decode_preamble(reader).await?;
531        let id = reader.read_u8().await?;
532        match id {
533            types::TRANSPARENT => {
534                let mut message: TransparentMessage =
535                    Default::default();
536                message.decode(reader).await?;
537                *self = ResponseMessage::Transparent(message);
538            }
539            types::OPAQUE => {
540                let mut message: OpaqueMessage = Default::default();
541                message.decode(reader).await?;
542                *self = ResponseMessage::Opaque(message);
543            }
544            _ => {
545                return Err(encoding_error(
546                    crate::Error::EncodingKind(id),
547                ))
548            }
549        }
550        Ok(())
551    }
552}
553
554#[async_trait]
555impl Encodable for Chunk {
556    async fn encode<W: AsyncWrite + AsyncSeek + Unpin + Send>(
557        &self,
558        writer: &mut BinaryWriter<W>,
559    ) -> Result<()> {
560        encode_payload(writer, &self.length, &self.contents).await?;
561        Ok(())
562    }
563}
564
565#[async_trait]
566impl Decodable for Chunk {
567    async fn decode<R: AsyncRead + AsyncSeek + Unpin + Send>(
568        &mut self,
569        reader: &mut BinaryReader<R>,
570    ) -> Result<()> {
571        let (length, contents) = decode_payload(reader).await?;
572        self.length = length;
573        self.contents = contents;
574        Ok(())
575    }
576}
577
578#[async_trait]
579impl Encodable for SealedEnvelope {
580    async fn encode<W: AsyncWrite + AsyncSeek + Unpin + Send>(
581        &self,
582        writer: &mut BinaryWriter<W>,
583    ) -> Result<()> {
584        let id: u8 = self.encoding.into();
585        writer.write_u8(id).await?;
586        writer.write_bool(self.broadcast).await?;
587
588        writer.write_u32(self.chunks.len() as u32).await?;
589        for chunk in &self.chunks {
590            chunk.encode(writer).await?;
591        }
592        Ok(())
593    }
594}
595
596#[async_trait]
597impl Decodable for SealedEnvelope {
598    async fn decode<R: AsyncRead + AsyncSeek + Unpin + Send>(
599        &mut self,
600        reader: &mut BinaryReader<R>,
601    ) -> Result<()> {
602        let id = reader.read_u8().await?;
603        match id {
604            types::ENCODING_BLOB => {
605                self.encoding = Encoding::Blob;
606            }
607            types::ENCODING_JSON => {
608                self.encoding = Encoding::Json;
609            }
610            _ => {
611                return Err(encoding_error(
612                    crate::Error::EncodingKind(id),
613                ))
614            }
615        }
616        self.broadcast = reader.read_bool().await?;
617
618        let num_chunks = reader.read_u32().await?;
619        for _ in 0..num_chunks {
620            let mut chunk: Chunk = Default::default();
621            chunk.decode(&mut *reader).await?;
622            self.chunks.push(chunk);
623        }
624
625        Ok(())
626    }
627}
628
629#[async_trait]
630impl Encodable for SessionRequest {
631    async fn encode<W: AsyncWrite + AsyncSeek + Unpin + Send>(
632        &self,
633        writer: &mut BinaryWriter<W>,
634    ) -> Result<()> {
635        // TODO: handle too many participants
636        writer.write_u16(self.participant_keys.len() as u16).await?;
637        for key in self.participant_keys.iter() {
638            encode_buffer(writer, key).await?;
639        }
640        Ok(())
641    }
642}
643
644#[async_trait]
645impl Decodable for SessionRequest {
646    async fn decode<R: AsyncRead + AsyncSeek + Unpin + Send>(
647        &mut self,
648        reader: &mut BinaryReader<R>,
649    ) -> Result<()> {
650        let size = reader.read_u16().await? as usize;
651        for _ in 0..size {
652            let key = decode_buffer(reader).await?;
653            self.participant_keys.push(key);
654        }
655        Ok(())
656    }
657}
658
659#[async_trait]
660impl Encodable for SessionState {
661    async fn encode<W: AsyncWrite + AsyncSeek + Unpin + Send>(
662        &self,
663        writer: &mut BinaryWriter<W>,
664    ) -> Result<()> {
665        writer.write_bytes(self.session_id.as_bytes()).await?;
666        writer.write_u16(self.all_participants.len() as u16).await?;
667        for key in &self.all_participants {
668            encode_buffer(writer, key).await?;
669        }
670        Ok(())
671    }
672}
673
674#[async_trait]
675impl Decodable for SessionState {
676    async fn decode<R: AsyncRead + AsyncSeek + Unpin + Send>(
677        &mut self,
678        reader: &mut BinaryReader<R>,
679    ) -> Result<()> {
680        self.session_id = SessionId::from_bytes(
681            reader
682                .read_bytes(16)
683                .await?
684                .as_slice()
685                .try_into()
686                .map_err(encoding_error)?,
687        );
688        let size = reader.read_u16().await? as usize;
689        for _ in 0..size {
690            let key = decode_buffer(reader).await?;
691            self.all_participants.push(key);
692        }
693        Ok(())
694    }
695}