polysig_protocol/encoding/v1/
mod.rs1use async_trait::async_trait;
2use binary_stream::futures::{
3 BinaryReader, BinaryWriter, Decodable, Encodable,
4};
5use futures::io::{AsyncRead, AsyncSeek, AsyncWrite};
6use std::io::Result;
7
8use crate::{
9 encoding::{
10 decode_preamble, encode_preamble, encoding_error, types,
11 MAX_BUFFER_SIZE,
12 },
13 Chunk, Encoding, Error, HandshakeMessage, OpaqueMessage,
14 RequestMessage, ResponseMessage, SealedEnvelope, ServerMessage,
15 SessionId, SessionRequest, SessionState, TransparentMessage,
16};
17
18pub const VERSION: u16 = 1;
20
21async fn encode_buffer<W: AsyncWrite + AsyncSeek + Unpin + Send>(
23 writer: &mut BinaryWriter<W>,
24 buffer: &[u8],
25) -> Result<()> {
26 if buffer.len() > MAX_BUFFER_SIZE {
27 return Err(encoding_error(Error::MaxBufferSize(
28 MAX_BUFFER_SIZE,
29 )));
30 }
31 writer.write_u16(buffer.len() as u16).await?;
32 writer.write_bytes(buffer).await?;
33 Ok(())
34}
35
36async fn decode_buffer<R: AsyncRead + AsyncSeek + Unpin + Send>(
38 reader: &mut BinaryReader<R>,
39) -> Result<Vec<u8>> {
40 let size = reader.read_u16().await?;
41 let buf = reader.read_bytes(size as usize).await?;
42 Ok(buf)
43}
44
45async fn encode_payload<W: AsyncWrite + AsyncSeek + Unpin + Send>(
48 writer: &mut BinaryWriter<W>,
49 length: &usize,
50 buffer: &[u8],
51) -> Result<()> {
52 if *length > MAX_BUFFER_SIZE {
53 return Err(encoding_error(Error::MaxBufferSize(
54 MAX_BUFFER_SIZE,
55 )));
56 }
57 writer.write_u16(*length as u16).await?;
58 encode_buffer(writer, buffer).await?;
59 Ok(())
60}
61
62async fn decode_payload<R: AsyncRead + AsyncSeek + Unpin + Send>(
65 reader: &mut BinaryReader<R>,
66) -> Result<(usize, Vec<u8>)> {
67 let length = reader.read_u16().await? as usize;
68 let buffer = decode_buffer(reader).await?;
69 Ok((length, buffer))
70}
71
72#[async_trait]
75impl Encodable for HandshakeMessage {
76 async fn encode<W: AsyncWrite + AsyncSeek + Unpin + Send>(
77 &self,
78 writer: &mut BinaryWriter<W>,
79 ) -> Result<()> {
80 let id: u8 = self.into();
81 writer.write_u8(id).await?;
82 match self {
83 Self::Initiator(len, buf) => {
84 encode_payload(writer, len, buf).await?;
85 }
86 Self::Responder(len, buf) => {
87 encode_payload(writer, len, buf).await?;
88 }
89 Self::Noop => unreachable!(),
90 }
91 Ok(())
92 }
93}
94
95#[async_trait]
99impl Decodable for HandshakeMessage {
100 async fn decode<R: AsyncRead + AsyncSeek + Unpin + Send>(
101 &mut self,
102 reader: &mut BinaryReader<R>,
103 ) -> Result<()> {
104 let id = reader.read_u8().await?;
105 match id {
106 types::HANDSHAKE_INITIATOR => {
107 let (len, buf) = decode_payload(reader).await?;
108 *self = HandshakeMessage::Initiator(len, buf);
109 }
110 types::HANDSHAKE_RESPONDER => {
111 let (len, buf) = decode_payload(reader).await?;
112 *self = HandshakeMessage::Responder(len, buf);
113 }
114 _ => {
115 return Err(encoding_error(
116 crate::Error::EncodingKind(id),
117 ))
118 }
119 }
120 Ok(())
121 }
122}
123
124#[async_trait]
128impl Encodable for TransparentMessage {
129 async fn encode<W: AsyncWrite + AsyncSeek + Unpin + Send>(
130 &self,
131 writer: &mut BinaryWriter<W>,
132 ) -> Result<()> {
133 let id: u8 = self.into();
134 writer.write_u8(id).await?;
135 match self {
136 Self::Error(code, message) => {
137 let code: u16 = (*code).into();
138 writer.write_u16(code).await?;
139 writer.write_string(message).await?;
140 }
141 Self::ServerHandshake(message) => {
142 message.encode(writer).await?;
143 }
144 Self::PeerHandshake {
145 public_key,
146 message,
147 } => {
148 encode_buffer(writer, public_key).await?;
149 message.encode(writer).await?;
150 }
151 Self::Noop => unreachable!(),
152 }
153 Ok(())
154 }
155}
156
157#[async_trait]
161impl Decodable for TransparentMessage {
162 async fn decode<R: AsyncRead + AsyncSeek + Unpin + Send>(
163 &mut self,
164 reader: &mut BinaryReader<R>,
165 ) -> Result<()> {
166 let id = reader.read_u8().await?;
167 match id {
168 types::ERROR => {
169 let code = reader
170 .read_u16()
171 .await?
172 .try_into()
173 .map_err(encoding_error)?;
174 let message = reader.read_string().await?;
175 *self = TransparentMessage::Error(code, message);
176 }
177 types::HANDSHAKE_SERVER => {
178 let mut message: HandshakeMessage =
179 Default::default();
180 message.decode(reader).await?;
181 *self = TransparentMessage::ServerHandshake(message);
182 }
183 types::HANDSHAKE_PEER => {
184 let public_key = decode_buffer(reader).await?;
185 let mut message: HandshakeMessage =
186 Default::default();
187 message.decode(reader).await?;
188 *self = TransparentMessage::PeerHandshake {
189 public_key,
190 message,
191 };
192 }
193 _ => {
194 return Err(encoding_error(
195 crate::Error::EncodingKind(id),
196 ))
197 }
198 }
199 Ok(())
200 }
201}
202
203#[async_trait]
207impl Encodable for ServerMessage {
208 async fn encode<W: AsyncWrite + AsyncSeek + Unpin + Send>(
209 &self,
210 writer: &mut BinaryWriter<W>,
211 ) -> Result<()> {
212 let id: u8 = self.into();
213 writer.write_u8(id).await?;
214 match self {
215 Self::Error(code, message) => {
216 let code: u16 = (*code).into();
217 writer.write_u16(code).await?;
218 writer.write_string(message).await?;
219 }
220 Self::NewSession(request) => {
221 request.encode(writer).await?;
222 }
223 Self::SessionConnection {
224 session_id,
225 peer_key,
226 } => {
227 writer.write_bytes(session_id.as_bytes()).await?;
228 encode_buffer(writer, peer_key).await?;
229 }
230 Self::SessionCreated(response) => {
231 response.encode(writer).await?;
232 }
233 Self::SessionReady(response) => {
234 response.encode(writer).await?;
235 }
236 Self::SessionActive(response) => {
237 response.encode(writer).await?;
238 }
239 Self::SessionTimeout(session_id) => {
240 writer.write_bytes(session_id.as_bytes()).await?;
241 }
242 Self::CloseSession(session_id) => {
243 writer.write_bytes(session_id.as_bytes()).await?;
244 }
245 Self::SessionFinished(session_id) => {
246 writer.write_bytes(session_id.as_bytes()).await?;
247 }
248 Self::Noop => unreachable!(),
249 }
250 Ok(())
251 }
252}
253
254#[async_trait]
258impl Decodable for ServerMessage {
259 async fn decode<R: AsyncRead + AsyncSeek + Unpin + Send>(
260 &mut self,
261 reader: &mut BinaryReader<R>,
262 ) -> Result<()> {
263 let id = reader.read_u8().await?;
264 match id {
265 types::ERROR => {
266 let code = reader
267 .read_u16()
268 .await?
269 .try_into()
270 .map_err(encoding_error)?;
271 let message = reader.read_string().await?;
272 *self = ServerMessage::Error(code, message);
273 }
274 types::SESSION_NEW => {
275 let mut session: SessionRequest = Default::default();
276 session.decode(reader).await?;
277 *self = ServerMessage::NewSession(session);
278 }
279 types::SESSION_CONNECTION => {
280 let session_id = SessionId::from_bytes(
281 reader
282 .read_bytes(16)
283 .await?
284 .as_slice()
285 .try_into()
286 .map_err(encoding_error)?,
287 );
288 let peer_key = decode_buffer(reader).await?;
289
290 *self = ServerMessage::SessionConnection {
291 session_id,
292 peer_key,
293 };
294 }
295 types::SESSION_CREATED => {
296 let mut session: SessionState = Default::default();
297 session.decode(reader).await?;
298 *self = ServerMessage::SessionCreated(session);
299 }
300 types::SESSION_READY => {
301 let mut session: SessionState = Default::default();
302 session.decode(reader).await?;
303 *self = ServerMessage::SessionReady(session);
304 }
305 types::SESSION_ACTIVE => {
306 let mut session: SessionState = Default::default();
307 session.decode(reader).await?;
308 *self = ServerMessage::SessionActive(session);
309 }
310 types::SESSION_TIMEOUT => {
311 let session_id = SessionId::from_bytes(
312 reader
313 .read_bytes(16)
314 .await?
315 .as_slice()
316 .try_into()
317 .map_err(encoding_error)?,
318 );
319 *self = ServerMessage::SessionTimeout(session_id);
320 }
321 types::SESSION_CLOSE => {
322 let session_id = SessionId::from_bytes(
323 reader
324 .read_bytes(16)
325 .await?
326 .as_slice()
327 .try_into()
328 .map_err(encoding_error)?,
329 );
330 *self = ServerMessage::CloseSession(session_id);
331 }
332 types::SESSION_FINISHED => {
333 let session_id = SessionId::from_bytes(
334 reader
335 .read_bytes(16)
336 .await?
337 .as_slice()
338 .try_into()
339 .map_err(encoding_error)?,
340 );
341 *self = ServerMessage::SessionFinished(session_id);
342 }
343 _ => {
344 return Err(encoding_error(
345 crate::Error::EncodingKind(id),
346 ))
347 }
348 }
349 Ok(())
350 }
351}
352
353#[async_trait]
357impl Encodable for OpaqueMessage {
358 async fn encode<W: AsyncWrite + AsyncSeek + Unpin + Send>(
359 &self,
360 writer: &mut BinaryWriter<W>,
361 ) -> Result<()> {
362 let id: u8 = self.into();
363 writer.write_u8(id).await?;
364 match self {
365 Self::ServerMessage(envelope) => {
366 envelope.encode(writer).await?;
367 }
368 Self::PeerMessage {
369 public_key,
370 session_id,
371 envelope,
372 } => {
373 encode_buffer(writer, public_key).await?;
374 writer.write_bool(session_id.is_some()).await?;
375 if let Some(id) = session_id {
376 writer.write_bytes(id.as_bytes()).await?;
377 }
378 envelope.encode(writer).await?;
379 }
380 Self::Noop => unreachable!(),
381 }
382 Ok(())
383 }
384}
385
386#[async_trait]
390impl Decodable for OpaqueMessage {
391 async fn decode<R: AsyncRead + AsyncSeek + Unpin + Send>(
392 &mut self,
393 reader: &mut BinaryReader<R>,
394 ) -> Result<()> {
395 let id = reader.read_u8().await?;
396 match id {
397 types::OPAQUE_SERVER => {
398 let mut envelope: SealedEnvelope = Default::default();
399 envelope.decode(reader).await?;
400 *self = OpaqueMessage::ServerMessage(envelope);
401 }
402 types::OPAQUE_PEER => {
403 let public_key = decode_buffer(reader).await?;
404 let has_session_id = reader.read_bool().await?;
405 let session_id = if has_session_id {
406 let session_id = SessionId::from_bytes(
407 reader
408 .read_bytes(16)
409 .await?
410 .as_slice()
411 .try_into()
412 .map_err(encoding_error)?,
413 );
414 Some(session_id)
415 } else {
416 None
417 };
418
419 let mut envelope: SealedEnvelope = Default::default();
420 envelope.decode(reader).await?;
421
422 *self = OpaqueMessage::PeerMessage {
423 public_key,
424 session_id,
425 envelope,
426 };
427 }
428 _ => {
429 return Err(encoding_error(
430 crate::Error::EncodingKind(id),
431 ))
432 }
433 }
434 Ok(())
435 }
436}
437
438#[async_trait]
442impl Encodable for RequestMessage {
443 async fn encode<W: AsyncWrite + AsyncSeek + Unpin + Send>(
444 &self,
445 writer: &mut BinaryWriter<W>,
446 ) -> Result<()> {
447 encode_preamble(writer).await?;
448 let id: u8 = self.into();
449 writer.write_u8(id).await?;
450 match self {
451 Self::Transparent(message) => {
452 message.encode(writer).await?;
453 }
454 Self::Opaque(message) => {
455 message.encode(writer).await?;
456 }
457 Self::Noop => unreachable!(),
458 }
459 Ok(())
460 }
461}
462
463#[async_trait]
467impl Decodable for RequestMessage {
468 async fn decode<R: AsyncRead + AsyncSeek + Unpin + Send>(
469 &mut self,
470 reader: &mut BinaryReader<R>,
471 ) -> Result<()> {
472 decode_preamble(reader).await?;
473 let id = reader.read_u8().await?;
474 match id {
475 types::TRANSPARENT => {
476 let mut message: TransparentMessage =
477 Default::default();
478 message.decode(reader).await?;
479 *self = RequestMessage::Transparent(message);
480 }
481 types::OPAQUE => {
482 let mut message: OpaqueMessage = Default::default();
483 message.decode(reader).await?;
484 *self = RequestMessage::Opaque(message);
485 }
486 _ => {
487 return Err(encoding_error(
488 crate::Error::EncodingKind(id),
489 ))
490 }
491 }
492 Ok(())
493 }
494}
495
496#[async_trait]
500impl Encodable for ResponseMessage {
501 async fn encode<W: AsyncWrite + AsyncSeek + Unpin + Send>(
502 &self,
503 writer: &mut BinaryWriter<W>,
504 ) -> Result<()> {
505 encode_preamble(writer).await?;
506 let id: u8 = self.into();
507 writer.write_u8(id).await?;
508 match self {
509 Self::Transparent(message) => {
510 message.encode(&mut *writer).await?;
511 }
512 Self::Opaque(message) => {
513 message.encode(&mut *writer).await?;
514 }
515 Self::Noop => unreachable!(),
516 }
517 Ok(())
518 }
519}
520
521#[async_trait]
525impl Decodable for ResponseMessage {
526 async fn decode<R: AsyncRead + AsyncSeek + Unpin + Send>(
527 &mut self,
528 reader: &mut BinaryReader<R>,
529 ) -> Result<()> {
530 decode_preamble(reader).await?;
531 let id = reader.read_u8().await?;
532 match id {
533 types::TRANSPARENT => {
534 let mut message: TransparentMessage =
535 Default::default();
536 message.decode(reader).await?;
537 *self = ResponseMessage::Transparent(message);
538 }
539 types::OPAQUE => {
540 let mut message: OpaqueMessage = Default::default();
541 message.decode(reader).await?;
542 *self = ResponseMessage::Opaque(message);
543 }
544 _ => {
545 return Err(encoding_error(
546 crate::Error::EncodingKind(id),
547 ))
548 }
549 }
550 Ok(())
551 }
552}
553
554#[async_trait]
555impl Encodable for Chunk {
556 async fn encode<W: AsyncWrite + AsyncSeek + Unpin + Send>(
557 &self,
558 writer: &mut BinaryWriter<W>,
559 ) -> Result<()> {
560 encode_payload(writer, &self.length, &self.contents).await?;
561 Ok(())
562 }
563}
564
565#[async_trait]
566impl Decodable for Chunk {
567 async fn decode<R: AsyncRead + AsyncSeek + Unpin + Send>(
568 &mut self,
569 reader: &mut BinaryReader<R>,
570 ) -> Result<()> {
571 let (length, contents) = decode_payload(reader).await?;
572 self.length = length;
573 self.contents = contents;
574 Ok(())
575 }
576}
577
578#[async_trait]
579impl Encodable for SealedEnvelope {
580 async fn encode<W: AsyncWrite + AsyncSeek + Unpin + Send>(
581 &self,
582 writer: &mut BinaryWriter<W>,
583 ) -> Result<()> {
584 let id: u8 = self.encoding.into();
585 writer.write_u8(id).await?;
586 writer.write_bool(self.broadcast).await?;
587
588 writer.write_u32(self.chunks.len() as u32).await?;
589 for chunk in &self.chunks {
590 chunk.encode(writer).await?;
591 }
592 Ok(())
593 }
594}
595
596#[async_trait]
597impl Decodable for SealedEnvelope {
598 async fn decode<R: AsyncRead + AsyncSeek + Unpin + Send>(
599 &mut self,
600 reader: &mut BinaryReader<R>,
601 ) -> Result<()> {
602 let id = reader.read_u8().await?;
603 match id {
604 types::ENCODING_BLOB => {
605 self.encoding = Encoding::Blob;
606 }
607 types::ENCODING_JSON => {
608 self.encoding = Encoding::Json;
609 }
610 _ => {
611 return Err(encoding_error(
612 crate::Error::EncodingKind(id),
613 ))
614 }
615 }
616 self.broadcast = reader.read_bool().await?;
617
618 let num_chunks = reader.read_u32().await?;
619 for _ in 0..num_chunks {
620 let mut chunk: Chunk = Default::default();
621 chunk.decode(&mut *reader).await?;
622 self.chunks.push(chunk);
623 }
624
625 Ok(())
626 }
627}
628
629#[async_trait]
630impl Encodable for SessionRequest {
631 async fn encode<W: AsyncWrite + AsyncSeek + Unpin + Send>(
632 &self,
633 writer: &mut BinaryWriter<W>,
634 ) -> Result<()> {
635 writer.write_u16(self.participant_keys.len() as u16).await?;
637 for key in self.participant_keys.iter() {
638 encode_buffer(writer, key).await?;
639 }
640 Ok(())
641 }
642}
643
644#[async_trait]
645impl Decodable for SessionRequest {
646 async fn decode<R: AsyncRead + AsyncSeek + Unpin + Send>(
647 &mut self,
648 reader: &mut BinaryReader<R>,
649 ) -> Result<()> {
650 let size = reader.read_u16().await? as usize;
651 for _ in 0..size {
652 let key = decode_buffer(reader).await?;
653 self.participant_keys.push(key);
654 }
655 Ok(())
656 }
657}
658
659#[async_trait]
660impl Encodable for SessionState {
661 async fn encode<W: AsyncWrite + AsyncSeek + Unpin + Send>(
662 &self,
663 writer: &mut BinaryWriter<W>,
664 ) -> Result<()> {
665 writer.write_bytes(self.session_id.as_bytes()).await?;
666 writer.write_u16(self.all_participants.len() as u16).await?;
667 for key in &self.all_participants {
668 encode_buffer(writer, key).await?;
669 }
670 Ok(())
671 }
672}
673
674#[async_trait]
675impl Decodable for SessionState {
676 async fn decode<R: AsyncRead + AsyncSeek + Unpin + Send>(
677 &mut self,
678 reader: &mut BinaryReader<R>,
679 ) -> Result<()> {
680 self.session_id = SessionId::from_bytes(
681 reader
682 .read_bytes(16)
683 .await?
684 .as_slice()
685 .try_into()
686 .map_err(encoding_error)?,
687 );
688 let size = reader.read_u16().await? as usize;
689 for _ in 0..size {
690 let key = decode_buffer(reader).await?;
691 self.all_participants.push(key);
692 }
693 Ok(())
694 }
695}