pub const FRAME_HEADER_SIZE: usize = 5;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum FrameKind {
Message,
Table,
}
#[derive(Debug)]
pub enum LightstreamMessage {
Message {
tag: u8,
payload: Vec<u8>,
},
Table {
tag: u8,
table: minarrow::Table,
},
}
impl LightstreamMessage {
pub fn tag(&self) -> u8 {
match self {
Self::Message { tag, .. } | Self::Table { tag, .. } => *tag,
}
}
pub fn payload(&self) -> Option<&[u8]> {
match self {
Self::Message { payload, .. } => Some(payload),
_ => None,
}
}
pub fn into_payload(self) -> Option<Vec<u8>> {
match self {
Self::Message { payload, .. } => Some(payload),
_ => None,
}
}
pub fn table(&self) -> Option<&minarrow::Table> {
match self {
Self::Table { table, .. } => Some(table),
_ => None,
}
}
pub fn into_table(self) -> Option<minarrow::Table> {
match self {
Self::Table { table, .. } => Some(table),
_ => None,
}
}
pub fn is_message(&self) -> bool {
matches!(self, Self::Message { .. })
}
pub fn is_table(&self) -> bool {
matches!(self, Self::Table { .. })
}
#[cfg(feature = "protobuf")]
pub fn decode_payload<M: prost::Message + Default>(&self) -> std::io::Result<M> {
let bytes = self.payload().ok_or_else(|| {
std::io::Error::new(std::io::ErrorKind::InvalidInput, "not a message variant")
})?;
M::decode(bytes).map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))
}
#[cfg(feature = "protobuf")]
pub fn into_decoded_payload<M: prost::Message + Default>(self) -> std::io::Result<M> {
let bytes = self.into_payload().ok_or_else(|| {
std::io::Error::new(std::io::ErrorKind::InvalidInput, "not a message variant")
})?;
M::decode(bytes.as_slice())
.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))
}
#[cfg(feature = "msgpack")]
pub fn decode_msgpack<M: serde::de::DeserializeOwned>(&self) -> std::io::Result<M> {
let bytes = self.payload().ok_or_else(|| {
std::io::Error::new(std::io::ErrorKind::InvalidInput, "not a message variant")
})?;
rmp_serde::from_slice(bytes)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))
}
#[cfg(feature = "msgpack")]
pub fn into_decoded_msgpack<M: serde::de::DeserializeOwned>(self) -> std::io::Result<M> {
let bytes = self.into_payload().ok_or_else(|| {
std::io::Error::new(std::io::ErrorKind::InvalidInput, "not a message variant")
})?;
rmp_serde::from_slice(&bytes)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))
}
}