mpc_protocol/encoding/v1/
mod.rs1use async_trait::async_trait;
2use binary_stream::futures::{
3 BinaryReader, BinaryWriter, Decodable, Encodable,
4};
5use futures::io::{AsyncRead, AsyncSeek, AsyncWrite};
6use std::{collections::HashSet, io::Result};
7
8use crate::{
9 encoding::{
10 decode_preamble, encode_preamble, encoding_error, types,
11 MAX_BUFFER_SIZE,
12 },
13 Chunk, Encoding, Error, HandshakeMessage, MeetingId,
14 MeetingState, OpaqueMessage, RequestMessage, ResponseMessage,
15 SealedEnvelope, ServerMessage, SessionId, SessionRequest,
16 SessionState, TransparentMessage,
17};
18
19pub const VERSION: u16 = 1;
21
22async fn encode_buffer<W: AsyncWrite + AsyncSeek + Unpin + Send>(
24 writer: &mut BinaryWriter<W>,
25 buffer: &[u8],
26) -> Result<()> {
27 if buffer.len() > MAX_BUFFER_SIZE {
28 return Err(encoding_error(Error::MaxBufferSize(
29 MAX_BUFFER_SIZE,
30 )));
31 }
32 writer.write_u16(buffer.len() as u16).await?;
33 writer.write_bytes(buffer).await?;
34 Ok(())
35}
36
37async fn decode_buffer<R: AsyncRead + AsyncSeek + Unpin + Send>(
39 reader: &mut BinaryReader<R>,
40) -> Result<Vec<u8>> {
41 let size = reader.read_u16().await?;
42 let buf = reader.read_bytes(size as usize).await?;
43 Ok(buf)
44}
45
46async fn encode_payload<W: AsyncWrite + AsyncSeek + Unpin + Send>(
49 writer: &mut BinaryWriter<W>,
50 length: &usize,
51 buffer: &[u8],
52) -> Result<()> {
53 if *length > MAX_BUFFER_SIZE {
54 return Err(encoding_error(Error::MaxBufferSize(
55 MAX_BUFFER_SIZE,
56 )));
57 }
58 writer.write_u16(*length as u16).await?;
59 encode_buffer(writer, buffer).await?;
60 Ok(())
61}
62
63async fn decode_payload<R: AsyncRead + AsyncSeek + Unpin + Send>(
66 reader: &mut BinaryReader<R>,
67) -> Result<(usize, Vec<u8>)> {
68 let length = reader.read_u16().await? as usize;
69 let buffer = decode_buffer(reader).await?;
70 Ok((length, buffer))
71}
72
73#[async_trait]
76impl Encodable for HandshakeMessage {
77 async fn encode<W: AsyncWrite + AsyncSeek + Unpin + Send>(
78 &self,
79 writer: &mut BinaryWriter<W>,
80 ) -> Result<()> {
81 let id: u8 = self.into();
82 writer.write_u8(id).await?;
83 match self {
84 Self::Initiator(len, buf) => {
85 encode_payload(writer, len, buf).await?;
86 }
87 Self::Responder(len, buf) => {
88 encode_payload(writer, len, buf).await?;
89 }
90 Self::Noop => unreachable!(),
91 }
92 Ok(())
93 }
94}
95
96#[async_trait]
100impl Decodable for HandshakeMessage {
101 async fn decode<R: AsyncRead + AsyncSeek + Unpin + Send>(
102 &mut self,
103 reader: &mut BinaryReader<R>,
104 ) -> Result<()> {
105 let id = reader.read_u8().await?;
106 match id {
107 types::HANDSHAKE_INITIATOR => {
108 let (len, buf) = decode_payload(reader).await?;
109 *self = HandshakeMessage::Initiator(len, buf);
110 }
111 types::HANDSHAKE_RESPONDER => {
112 let (len, buf) = decode_payload(reader).await?;
113 *self = HandshakeMessage::Responder(len, buf);
114 }
115 _ => {
116 return Err(encoding_error(
117 crate::Error::EncodingKind(id),
118 ))
119 }
120 }
121 Ok(())
122 }
123}
124
125#[async_trait]
129impl Encodable for TransparentMessage {
130 async fn encode<W: AsyncWrite + AsyncSeek + Unpin + Send>(
131 &self,
132 writer: &mut BinaryWriter<W>,
133 ) -> Result<()> {
134 let id: u8 = self.into();
135 writer.write_u8(id).await?;
136 match self {
137 Self::Error(code, message) => {
138 let code: u16 = (*code).into();
139 writer.write_u16(code).await?;
140 writer.write_string(message).await?;
141 }
142 Self::ServerHandshake(message) => {
143 message.encode(writer).await?;
144 }
145 Self::PeerHandshake {
146 public_key,
147 message,
148 } => {
149 encode_buffer(writer, public_key).await?;
150 message.encode(writer).await?;
151 }
152 Self::Noop => unreachable!(),
153 }
154 Ok(())
155 }
156}
157
158#[async_trait]
162impl Decodable for TransparentMessage {
163 async fn decode<R: AsyncRead + AsyncSeek + Unpin + Send>(
164 &mut self,
165 reader: &mut BinaryReader<R>,
166 ) -> Result<()> {
167 let id = reader.read_u8().await?;
168 match id {
169 types::ERROR => {
170 let code = reader
171 .read_u16()
172 .await?
173 .try_into()
174 .map_err(encoding_error)?;
175 let message = reader.read_string().await?;
176 *self = TransparentMessage::Error(code, message);
177 }
178 types::HANDSHAKE_SERVER => {
179 let mut message: HandshakeMessage =
180 Default::default();
181 message.decode(reader).await?;
182 *self = TransparentMessage::ServerHandshake(message);
183 }
184 types::HANDSHAKE_PEER => {
185 let public_key = decode_buffer(reader).await?;
186 let mut message: HandshakeMessage =
187 Default::default();
188 message.decode(reader).await?;
189 *self = TransparentMessage::PeerHandshake {
190 public_key,
191 message,
192 };
193 }
194 _ => {
195 return Err(encoding_error(
196 crate::Error::EncodingKind(id),
197 ))
198 }
199 }
200 Ok(())
201 }
202}
203
204#[async_trait]
208impl Encodable for ServerMessage {
209 async fn encode<W: AsyncWrite + AsyncSeek + Unpin + Send>(
210 &self,
211 writer: &mut BinaryWriter<W>,
212 ) -> Result<()> {
213 let id: u8 = self.into();
214 writer.write_u8(id).await?;
215 match self {
216 Self::Error(code, message) => {
217 let code: u16 = (*code).into();
218 writer.write_u16(code).await?;
219 writer.write_string(message).await?;
220 }
221 Self::NewMeeting {
222 owner_id,
223 slots,
224 data,
225 } => {
226 writer.write_bytes(owner_id.as_ref()).await?;
227 writer.write_u32(slots.len() as u32).await?;
228 for slot in slots {
229 writer.write_bytes(slot.as_ref()).await?;
230 }
231 writer
232 .write_string(serde_json::to_string(&data)?)
233 .await?;
234 }
235 Self::MeetingCreated(response) => {
236 response.encode(writer).await?;
237 }
238 Self::JoinMeeting(meeting_id, user_id) => {
239 writer.write_bytes(meeting_id.as_bytes()).await?;
240 writer.write_bytes(user_id.as_ref()).await?;
241 }
242 Self::MeetingReady(response) => {
243 response.encode(writer).await?;
244 }
245 Self::NewSession(request) => {
246 request.encode(writer).await?;
247 }
248 Self::SessionConnection {
249 session_id,
250 peer_key,
251 } => {
252 writer.write_bytes(session_id.as_bytes()).await?;
253 encode_buffer(writer, peer_key).await?;
254 }
255 Self::SessionCreated(response) => {
256 response.encode(writer).await?;
257 }
258 Self::SessionReady(response) => {
259 response.encode(writer).await?;
260 }
261 Self::SessionActive(response) => {
262 response.encode(writer).await?;
263 }
264 Self::SessionTimeout(session_id) => {
265 writer.write_bytes(session_id.as_bytes()).await?;
266 }
267 Self::CloseSession(session_id) => {
268 writer.write_bytes(session_id.as_bytes()).await?;
269 }
270 Self::SessionFinished(session_id) => {
271 writer.write_bytes(session_id.as_bytes()).await?;
272 }
273 Self::Noop => unreachable!(),
274 }
275 Ok(())
276 }
277}
278
279#[async_trait]
283impl Decodable for ServerMessage {
284 async fn decode<R: AsyncRead + AsyncSeek + Unpin + Send>(
285 &mut self,
286 reader: &mut BinaryReader<R>,
287 ) -> Result<()> {
288 let id = reader.read_u8().await?;
289 match id {
290 types::ERROR => {
291 let code = reader
292 .read_u16()
293 .await?
294 .try_into()
295 .map_err(encoding_error)?;
296 let message = reader.read_string().await?;
297 *self = ServerMessage::Error(code, message);
298 }
299 types::MEETING_NEW => {
300 let owner_id: [u8; 32] =
301 reader.read_bytes(32).await?.try_into().unwrap();
302
303 let mut slots = HashSet::new();
304 let num_slots = reader.read_u32().await?;
305 for _ in 0..num_slots {
306 let slot: [u8; 32] = reader
307 .read_bytes(32)
308 .await?
309 .try_into()
310 .unwrap();
311 slots.insert(slot.into());
312 }
313 let data = reader.read_string().await?;
314 *self = ServerMessage::NewMeeting {
315 owner_id: owner_id.into(),
316 slots,
317 data: serde_json::from_str(&data)?,
318 };
319 }
320 types::MEETING_CREATED => {
321 let mut meeting: MeetingState = Default::default();
322 meeting.decode(reader).await?;
323 *self = ServerMessage::MeetingCreated(meeting);
324 }
325 types::MEETING_JOIN => {
326 let meeting_id = MeetingId::from_bytes(
327 reader
328 .read_bytes(16)
329 .await?
330 .as_slice()
331 .try_into()
332 .map_err(encoding_error)?,
333 );
334 let user_id: [u8; 32] =
335 reader.read_bytes(32).await?.try_into().unwrap();
336
337 *self = ServerMessage::JoinMeeting(
338 meeting_id,
339 user_id.into(),
340 );
341 }
342 types::MEETING_READY => {
343 let mut meeting: MeetingState = Default::default();
344 meeting.decode(reader).await?;
345 *self = ServerMessage::MeetingReady(meeting);
346 }
347 types::SESSION_NEW => {
348 let mut session: SessionRequest = Default::default();
349 session.decode(reader).await?;
350 *self = ServerMessage::NewSession(session);
351 }
352 types::SESSION_CONNECTION => {
353 let session_id = SessionId::from_bytes(
354 reader
355 .read_bytes(16)
356 .await?
357 .as_slice()
358 .try_into()
359 .map_err(encoding_error)?,
360 );
361 let peer_key = decode_buffer(reader).await?;
362
363 *self = ServerMessage::SessionConnection {
364 session_id,
365 peer_key,
366 };
367 }
368 types::SESSION_CREATED => {
369 let mut session: SessionState = Default::default();
370 session.decode(reader).await?;
371 *self = ServerMessage::SessionCreated(session);
372 }
373 types::SESSION_READY => {
374 let mut session: SessionState = Default::default();
375 session.decode(reader).await?;
376 *self = ServerMessage::SessionReady(session);
377 }
378 types::SESSION_ACTIVE => {
379 let mut session: SessionState = Default::default();
380 session.decode(reader).await?;
381 *self = ServerMessage::SessionActive(session);
382 }
383 types::SESSION_TIMEOUT => {
384 let session_id = SessionId::from_bytes(
385 reader
386 .read_bytes(16)
387 .await?
388 .as_slice()
389 .try_into()
390 .map_err(encoding_error)?,
391 );
392 *self = ServerMessage::SessionTimeout(session_id);
393 }
394 types::SESSION_CLOSE => {
395 let session_id = SessionId::from_bytes(
396 reader
397 .read_bytes(16)
398 .await?
399 .as_slice()
400 .try_into()
401 .map_err(encoding_error)?,
402 );
403 *self = ServerMessage::CloseSession(session_id);
404 }
405 types::SESSION_FINISHED => {
406 let session_id = SessionId::from_bytes(
407 reader
408 .read_bytes(16)
409 .await?
410 .as_slice()
411 .try_into()
412 .map_err(encoding_error)?,
413 );
414 *self = ServerMessage::SessionFinished(session_id);
415 }
416 _ => {
417 return Err(encoding_error(
418 crate::Error::EncodingKind(id),
419 ))
420 }
421 }
422 Ok(())
423 }
424}
425
426#[async_trait]
430impl Encodable for OpaqueMessage {
431 async fn encode<W: AsyncWrite + AsyncSeek + Unpin + Send>(
432 &self,
433 writer: &mut BinaryWriter<W>,
434 ) -> Result<()> {
435 let id: u8 = self.into();
436 writer.write_u8(id).await?;
437 match self {
438 Self::ServerMessage(envelope) => {
439 envelope.encode(writer).await?;
440 }
441 Self::PeerMessage {
442 public_key,
443 session_id,
444 envelope,
445 } => {
446 encode_buffer(writer, public_key).await?;
447 writer.write_bool(session_id.is_some()).await?;
448 if let Some(id) = session_id {
449 writer.write_bytes(id.as_bytes()).await?;
450 }
451 envelope.encode(writer).await?;
452 }
453 Self::Noop => unreachable!(),
454 }
455 Ok(())
456 }
457}
458
459#[async_trait]
463impl Decodable for OpaqueMessage {
464 async fn decode<R: AsyncRead + AsyncSeek + Unpin + Send>(
465 &mut self,
466 reader: &mut BinaryReader<R>,
467 ) -> Result<()> {
468 let id = reader.read_u8().await?;
469 match id {
470 types::OPAQUE_SERVER => {
471 let mut envelope: SealedEnvelope = Default::default();
472 envelope.decode(reader).await?;
473 *self = OpaqueMessage::ServerMessage(envelope);
474 }
475 types::OPAQUE_PEER => {
476 let public_key = decode_buffer(reader).await?;
477 let has_session_id = reader.read_bool().await?;
478 let session_id = if has_session_id {
479 let session_id = SessionId::from_bytes(
480 reader
481 .read_bytes(16)
482 .await?
483 .as_slice()
484 .try_into()
485 .map_err(encoding_error)?,
486 );
487 Some(session_id)
488 } else {
489 None
490 };
491
492 let mut envelope: SealedEnvelope = Default::default();
493 envelope.decode(reader).await?;
494
495 *self = OpaqueMessage::PeerMessage {
496 public_key,
497 session_id,
498 envelope,
499 };
500 }
501 _ => {
502 return Err(encoding_error(
503 crate::Error::EncodingKind(id),
504 ))
505 }
506 }
507 Ok(())
508 }
509}
510
511#[async_trait]
515impl Encodable for RequestMessage {
516 async fn encode<W: AsyncWrite + AsyncSeek + Unpin + Send>(
517 &self,
518 writer: &mut BinaryWriter<W>,
519 ) -> Result<()> {
520 encode_preamble(writer).await?;
521 let id: u8 = self.into();
522 writer.write_u8(id).await?;
523 match self {
524 Self::Transparent(message) => {
525 message.encode(writer).await?;
526 }
527 Self::Opaque(message) => {
528 message.encode(writer).await?;
529 }
530 Self::Noop => unreachable!(),
531 }
532 Ok(())
533 }
534}
535
536#[async_trait]
540impl Decodable for RequestMessage {
541 async fn decode<R: AsyncRead + AsyncSeek + Unpin + Send>(
542 &mut self,
543 reader: &mut BinaryReader<R>,
544 ) -> Result<()> {
545 decode_preamble(reader).await?;
546 let id = reader.read_u8().await?;
547 match id {
548 types::TRANSPARENT => {
549 let mut message: TransparentMessage =
550 Default::default();
551 message.decode(reader).await?;
552 *self = RequestMessage::Transparent(message);
553 }
554 types::OPAQUE => {
555 let mut message: OpaqueMessage = Default::default();
556 message.decode(reader).await?;
557 *self = RequestMessage::Opaque(message);
558 }
559 _ => {
560 return Err(encoding_error(
561 crate::Error::EncodingKind(id),
562 ))
563 }
564 }
565 Ok(())
566 }
567}
568
569#[async_trait]
573impl Encodable for ResponseMessage {
574 async fn encode<W: AsyncWrite + AsyncSeek + Unpin + Send>(
575 &self,
576 writer: &mut BinaryWriter<W>,
577 ) -> Result<()> {
578 encode_preamble(writer).await?;
579 let id: u8 = self.into();
580 writer.write_u8(id).await?;
581 match self {
582 Self::Transparent(message) => {
583 message.encode(&mut *writer).await?;
584 }
585 Self::Opaque(message) => {
586 message.encode(&mut *writer).await?;
587 }
588 Self::Noop => unreachable!(),
589 }
590 Ok(())
591 }
592}
593
594#[async_trait]
598impl Decodable for ResponseMessage {
599 async fn decode<R: AsyncRead + AsyncSeek + Unpin + Send>(
600 &mut self,
601 reader: &mut BinaryReader<R>,
602 ) -> Result<()> {
603 decode_preamble(reader).await?;
604 let id = reader.read_u8().await?;
605 match id {
606 types::TRANSPARENT => {
607 let mut message: TransparentMessage =
608 Default::default();
609 message.decode(reader).await?;
610 *self = ResponseMessage::Transparent(message);
611 }
612 types::OPAQUE => {
613 let mut message: OpaqueMessage = Default::default();
614 message.decode(reader).await?;
615 *self = ResponseMessage::Opaque(message);
616 }
617 _ => {
618 return Err(encoding_error(
619 crate::Error::EncodingKind(id),
620 ))
621 }
622 }
623 Ok(())
624 }
625}
626
627#[async_trait]
628impl Encodable for Chunk {
629 async fn encode<W: AsyncWrite + AsyncSeek + Unpin + Send>(
630 &self,
631 writer: &mut BinaryWriter<W>,
632 ) -> Result<()> {
633 encode_payload(writer, &self.length, &self.contents).await?;
634 Ok(())
635 }
636}
637
638#[async_trait]
639impl Decodable for Chunk {
640 async fn decode<R: AsyncRead + AsyncSeek + Unpin + Send>(
641 &mut self,
642 reader: &mut BinaryReader<R>,
643 ) -> Result<()> {
644 let (length, contents) = decode_payload(reader).await?;
645 self.length = length;
646 self.contents = contents;
647 Ok(())
648 }
649}
650
651#[async_trait]
652impl Encodable for SealedEnvelope {
653 async fn encode<W: AsyncWrite + AsyncSeek + Unpin + Send>(
654 &self,
655 writer: &mut BinaryWriter<W>,
656 ) -> Result<()> {
657 let id: u8 = self.encoding.into();
658 writer.write_u8(id).await?;
659 writer.write_bool(self.broadcast).await?;
660
661 writer.write_u32(self.chunks.len() as u32).await?;
662 for chunk in &self.chunks {
663 chunk.encode(writer).await?;
664 }
665 Ok(())
666 }
667}
668
669#[async_trait]
670impl Decodable for SealedEnvelope {
671 async fn decode<R: AsyncRead + AsyncSeek + Unpin + Send>(
672 &mut self,
673 reader: &mut BinaryReader<R>,
674 ) -> Result<()> {
675 let id = reader.read_u8().await?;
676 match id {
677 types::ENCODING_BLOB => {
678 self.encoding = Encoding::Blob;
679 }
680 types::ENCODING_JSON => {
681 self.encoding = Encoding::Json;
682 }
683 _ => {
684 return Err(encoding_error(
685 crate::Error::EncodingKind(id),
686 ))
687 }
688 }
689 self.broadcast = reader.read_bool().await?;
690
691 let num_chunks = reader.read_u32().await?;
692 for _ in 0..num_chunks {
693 let mut chunk: Chunk = Default::default();
694 chunk.decode(&mut *reader).await?;
695 self.chunks.push(chunk);
696 }
697
698 Ok(())
699 }
700}
701
702#[async_trait]
703impl Encodable for SessionRequest {
704 async fn encode<W: AsyncWrite + AsyncSeek + Unpin + Send>(
705 &self,
706 writer: &mut BinaryWriter<W>,
707 ) -> Result<()> {
708 writer.write_u16(self.participant_keys.len() as u16).await?;
710 for key in self.participant_keys.iter() {
711 encode_buffer(writer, key).await?;
712 }
713 Ok(())
714 }
715}
716
717#[async_trait]
718impl Decodable for SessionRequest {
719 async fn decode<R: AsyncRead + AsyncSeek + Unpin + Send>(
720 &mut self,
721 reader: &mut BinaryReader<R>,
722 ) -> Result<()> {
723 let size = reader.read_u16().await? as usize;
724 for _ in 0..size {
725 let key = decode_buffer(reader).await?;
726 self.participant_keys.push(key);
727 }
728 Ok(())
729 }
730}
731
732#[async_trait]
733impl Encodable for MeetingState {
734 async fn encode<W: AsyncWrite + AsyncSeek + Unpin + Send>(
735 &self,
736 writer: &mut BinaryWriter<W>,
737 ) -> Result<()> {
738 writer.write_bytes(self.meeting_id.as_bytes()).await?;
739 writer
740 .write_u16(self.registered_participants.len() as u16)
741 .await?;
742 for key in &self.registered_participants {
743 encode_buffer(writer, key).await?;
744 }
745 writer
746 .write_string(serde_json::to_string(&self.data)?)
747 .await?;
748 Ok(())
749 }
750}
751
752#[async_trait]
753impl Decodable for MeetingState {
754 async fn decode<R: AsyncRead + AsyncSeek + Unpin + Send>(
755 &mut self,
756 reader: &mut BinaryReader<R>,
757 ) -> Result<()> {
758 self.meeting_id = MeetingId::from_bytes(
759 reader
760 .read_bytes(16)
761 .await?
762 .as_slice()
763 .try_into()
764 .map_err(encoding_error)?,
765 );
766 let size = reader.read_u16().await? as usize;
767 for _ in 0..size {
768 let key = decode_buffer(reader).await?;
769 self.registered_participants.push(key);
770 }
771
772 let data = reader.read_string().await?;
773 self.data = serde_json::from_str(&data)?;
774
775 Ok(())
776 }
777}
778
779#[async_trait]
780impl Encodable for SessionState {
781 async fn encode<W: AsyncWrite + AsyncSeek + Unpin + Send>(
782 &self,
783 writer: &mut BinaryWriter<W>,
784 ) -> Result<()> {
785 writer.write_bytes(self.session_id.as_bytes()).await?;
786 writer.write_u16(self.all_participants.len() as u16).await?;
787 for key in &self.all_participants {
788 encode_buffer(writer, key).await?;
789 }
790 Ok(())
791 }
792}
793
794#[async_trait]
795impl Decodable for SessionState {
796 async fn decode<R: AsyncRead + AsyncSeek + Unpin + Send>(
797 &mut self,
798 reader: &mut BinaryReader<R>,
799 ) -> Result<()> {
800 self.session_id = SessionId::from_bytes(
801 reader
802 .read_bytes(16)
803 .await?
804 .as_slice()
805 .try_into()
806 .map_err(encoding_error)?,
807 );
808 let size = reader.read_u16().await? as usize;
809 for _ in 0..size {
810 let key = decode_buffer(reader).await?;
811 self.all_participants.push(key);
812 }
813 Ok(())
814 }
815}