pyth_lazer_protocol/
binary_update.rs

1use {
2    crate::{message::Message, subscription::SubscriptionId},
3    anyhow::{bail, Context},
4    byteorder::{WriteBytesExt, BE, LE},
5};
6
7/// First bytes (LE) of a binary Websocket message. A binary message will
8/// contain one or multiple price updates, each with its encoding format magic.
9pub const BINARY_UPDATE_FORMAT_MAGIC: u32 = 461928307;
10
11/// Content of a Websocket update sent to the client when the binary delivery method
12/// is requested.
13#[derive(Debug, Clone, PartialEq, Eq, Hash)]
14pub struct BinaryWsUpdate {
15    pub subscription_id: SubscriptionId,
16    pub messages: Vec<Message>,
17}
18
19impl BinaryWsUpdate {
20    pub fn serialize(&self, buf: &mut Vec<u8>) -> anyhow::Result<()> {
21        buf.write_u32::<LE>(BINARY_UPDATE_FORMAT_MAGIC)?;
22        buf.write_u64::<BE>(self.subscription_id.0)?;
23
24        for message in &self.messages {
25            write_with_len_header(buf, |buf| message.serialize(buf))?;
26        }
27        Ok(())
28    }
29
30    pub fn deserialize_slice(data: &[u8]) -> anyhow::Result<Self> {
31        let mut pos = 0;
32        let magic = u32::from_le_bytes(
33            data.get(pos..pos + 4)
34                .context("data too short")?
35                .try_into()?,
36        );
37        pos += 4;
38
39        if magic != BINARY_UPDATE_FORMAT_MAGIC {
40            bail!("binary update format magic mismatch");
41        }
42
43        let subscription_id = SubscriptionId(u64::from_be_bytes(
44            data.get(pos..pos + 8)
45                .context("data too short")?
46                .try_into()?,
47        ));
48        pos += 8;
49
50        let mut messages = Vec::new();
51
52        while pos < data.len() {
53            let len: usize = u16::from_be_bytes(
54                data.get(pos..pos + 2)
55                    .context("data too short")?
56                    .try_into()?,
57            )
58            .into();
59            pos += 2;
60            let message_data = data.get(pos..pos + len).context("data too short")?;
61            pos += len;
62            messages.push(Message::deserialize_slice(message_data)?);
63        }
64
65        Ok(Self {
66            subscription_id,
67            messages,
68        })
69    }
70}
71
72/// Performs write operations specified by `f` and inserts the length header before them.
73/// The length is written as a BE u16.
74fn write_with_len_header(
75    out: &mut Vec<u8>,
76    f: impl FnOnce(&mut Vec<u8>) -> anyhow::Result<()>,
77) -> anyhow::Result<()> {
78    let len_index = out.len();
79    // Make space for size.
80    out.push(0);
81    out.push(0);
82    let data_start_index = out.len();
83    f(out)?;
84    let len = out.len() - data_start_index;
85    let len: u16 = len.try_into()?;
86    out[len_index..data_start_index].copy_from_slice(&len.to_be_bytes());
87
88    Ok(())
89}