ax_core 0.3.2

Core library implementing the functions of ax
Documentation
use ax_types::{AppId, NodeId, Payload, Tag, TagSet, Timestamp};
use cbor_data::{index_str, value::Number, Cbor, CborBuilder, Encoder};
use chrono::{DateTime, FixedOffset};
use futures::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
use libp2p::{core::ProtocolName, request_response::RequestResponseCodec};
use serde_json::Value;
use std::{
    collections::BTreeMap,
    convert::TryFrom,
    io::{Error, ErrorKind, Result},
};

#[derive(Clone, PartialEq, Eq)]
pub enum BanyanRequest {
    MakeFreshTopic(String),
    AppendEvents(String, Vec<u8>),
    Finalise(String),
    Future,
}

impl std::fmt::Debug for BanyanRequest {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        match self {
            Self::MakeFreshTopic(arg0) => f.debug_tuple("MakeFreshTopic").field(arg0).finish(),
            Self::AppendEvents(arg0, arg1) => f.debug_tuple("AppendEvents").field(arg0).field(&arg1.len()).finish(),
            Self::Finalise(arg0) => f.debug_tuple("Finalise").field(arg0).finish(),
            Self::Future => write!(f, "Future"),
        }
    }
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub enum BanyanResponse {
    Ok,
    Error(String),
    Future,
}

impl<E: std::fmt::Display> From<std::result::Result<(), E>> for BanyanResponse {
    fn from(r: std::result::Result<(), E>) -> Self {
        match r {
            Ok(_) => BanyanResponse::Ok,
            Err(e) => BanyanResponse::Error(e.to_string()),
        }
    }
}

#[derive(Clone, Copy, PartialEq, Eq)]
pub struct BanyanProtocolName;
impl ProtocolName for BanyanProtocolName {
    fn protocol_name(&self) -> &[u8] {
        b"/actyx/banyan/create"
    }
}

#[derive(Default, Clone, Debug)]
pub struct BanyanProtocol {
    buf: Vec<u8>,
}

#[async_trait::async_trait]
impl RequestResponseCodec for BanyanProtocol {
    type Protocol = BanyanProtocolName;
    type Request = BanyanRequest;
    type Response = BanyanResponse;

    async fn read_request<T: AsyncRead + Send + Unpin>(
        &mut self,
        _protocol: &Self::Protocol,
        io: &mut T,
    ) -> Result<Self::Request> {
        let mut len_bytes = [0u8; 4];
        io.read_exact(&mut len_bytes).await?;
        let len = u32::from_be_bytes(len_bytes);

        self.buf.resize(len as usize, 0);
        io.read_exact(self.buf.as_mut()).await?;

        let cbor = Cbor::checked(self.buf.as_ref()).map_err(|e| Error::new(ErrorKind::InvalidData, e))?;
        (|| {
            let arr = cbor.decode().to_array()?;
            match arr.get(0)?.decode().to_number()? {
                Number::Int(1) => Some(BanyanRequest::MakeFreshTopic(
                    arr.get(1)?.decode().to_str()?.into_owned(),
                )),
                Number::Int(2) => Some(BanyanRequest::AppendEvents(
                    arr.get(1)?.decode().to_str()?.into_owned(),
                    arr.get(2)?.decode().to_bytes()?.into_owned(),
                )),
                Number::Int(3) => Some(BanyanRequest::Finalise(arr.get(1)?.decode().to_str()?.into_owned())),
                _ => Some(BanyanRequest::Future),
            }
        })()
        .ok_or_else(|| Error::new(ErrorKind::InvalidData, "expected array"))
    }

    async fn read_response<T: AsyncRead + Send + Unpin>(
        &mut self,
        _protocol: &Self::Protocol,
        io: &mut T,
    ) -> Result<Self::Response> {
        let mut len_bytes = [0u8; 4];
        io.read_exact(&mut len_bytes).await?;
        let len = u32::from_be_bytes(len_bytes);

        self.buf.resize(len as usize, 0);
        io.read_exact(self.buf.as_mut()).await?;

        let cbor = Cbor::checked(self.buf.as_ref()).map_err(|e| Error::new(ErrorKind::InvalidData, e))?;
        (|| {
            let arr = cbor.decode().to_array()?;
            match arr.get(0)?.decode().to_number()? {
                Number::Int(1) => Some(BanyanResponse::Ok),
                Number::Int(2) => Some(BanyanResponse::Error(arr.get(1)?.decode().to_str()?.into_owned())),
                _ => Some(BanyanResponse::Future),
            }
        })()
        .ok_or_else(|| Error::new(ErrorKind::InvalidData, "expected array"))
    }

    async fn write_request<T: AsyncWrite + Send + Unpin>(
        &mut self,
        _protocol: &Self::Protocol,
        io: &mut T,
        req: Self::Request,
    ) -> Result<()> {
        let cbor = CborBuilder::with_scratch_space(&mut self.buf).encode_array(move |b| match req {
            BanyanRequest::MakeFreshTopic(topic) => {
                b.encode_u64(1);
                b.encode_str(topic);
            }
            BanyanRequest::AppendEvents(topic, data) => {
                b.encode_u64(2);
                b.encode_str(topic);
                b.encode_bytes(data);
            }
            BanyanRequest::Finalise(topic) => {
                b.encode_u64(3);
                b.encode_str(topic);
            }
            BanyanRequest::Future => unreachable!(),
        });
        let len_bytes = u32::try_from(cbor.as_slice().len())
            .map_err(|_| Error::new(ErrorKind::InvalidInput, "cannot enocde more than 4GiB"))?
            .to_be_bytes();
        io.write_all(&len_bytes).await?;
        io.write_all(cbor.as_slice()).await?;
        Ok(())
    }

    async fn write_response<T: AsyncWrite + Send + Unpin>(
        &mut self,
        _protocol: &Self::Protocol,
        io: &mut T,
        res: Self::Response,
    ) -> Result<()> {
        let cbor = CborBuilder::with_scratch_space(&mut self.buf).encode_array(move |b| match res {
            BanyanResponse::Ok => {
                b.encode_u64(1);
            }
            BanyanResponse::Error(error) => {
                b.encode_u64(2);
                b.encode_str(error);
            }
            BanyanResponse::Future => unreachable!(),
        });
        let len_bytes = u32::try_from(cbor.as_slice().len())
            .map_err(|_| Error::new(ErrorKind::InvalidInput, "cannot enocde more than 4GiB"))?
            .to_be_bytes();
        io.write_all(&len_bytes).await?;
        io.write_all(cbor.as_slice()).await?;
        Ok(())
    }
}

pub fn decode_dump_frame(cbor: &Cbor) -> Option<(NodeId, AppId, Timestamp, TagSet, Payload)> {
    let orig_node = NodeId::from_bytes(cbor.index(index_str("stream[0]"))?.decode().to_bytes()?.as_ref()).ok()?;
    let app_id = AppId::try_from(cbor.index(index_str("appId"))?.decode().to_str()?.as_ref()).ok()?;
    let timestamp = cbor.index(index_str("timestamp"))?;
    let timestamp = match timestamp.decode().to_number()? {
        Number::Int(t) => Timestamp::from(u64::try_from(t).ok()?),
        _ => return None,
    };
    let tags = cbor.index(index_str("tags"))?.decode().to_array().map(|tags| {
        tags.into_iter()
            .filter_map(|cbor| cbor.decode().to_str().and_then(|s| Tag::try_from(s.as_ref()).ok()))
            .collect()
    })?;
    let payload = Payload::from_bytes(cbor.index(index_str("payload"))?.as_slice());
    Some((orig_node, app_id, timestamp, tags, payload))
}

pub fn decode_dump_header(cbor: &Cbor) -> Option<(NodeId, String, DateTime<FixedOffset>)> {
    let dict = cbor.decode().to_dict()?;
    let dict = dict
        .iter()
        .filter_map(|(k, v)| k.decode().to_str().map(|k| (k, v.decode())))
        .collect::<BTreeMap<_, _>>();
    let node_id = NodeId::from_bytes(dict.get("nodeId")?.as_bytes()?.as_ref()).ok()?;
    let display_name = dict.get("displayName")?.as_str()?;
    let timestamp = DateTime::<FixedOffset>::try_from(dict.get("timestamp")?.as_timestamp()?).ok()?;
    let settings = dict.get("settings")?.as_str()?;
    let topic = serde_json::from_str::<Value>(settings.as_ref())
        .ok()?
        .pointer("/swarm/topic")?
        .as_str()?
        .to_owned();
    tracing::info!(
        "reading dump from `{}` (topic {}) taken at {}",
        display_name,
        topic,
        timestamp
    );
    Some((node_id, topic, timestamp))
}

#[cfg(test)]
mod tests {
    use super::*;
    use BanyanRequest::*;
    use BanyanResponse::*;

    #[tokio::test]
    async fn roundtrip() {
        let mut p = BanyanProtocol::default();
        let c = BanyanProtocolName;
        let mut v = Vec::new();

        p.write_request(&c, &mut v, MakeFreshTopic("hello".into()))
            .await
            .unwrap();
        let req = p.read_request(&c, &mut v.as_slice()).await.unwrap();
        assert_eq!(req, MakeFreshTopic("hello".into()));

        v.clear();
        p.write_request(&c, &mut v, AppendEvents("hello".into(), vec![1, 2, 3, 4, 5]))
            .await
            .unwrap();
        let req = p.read_request(&c, &mut v.as_slice()).await.unwrap();
        assert_eq!(req, AppendEvents("hello".into(), vec![1, 2, 3, 4, 5]));

        v.clear();
        p.write_request(&c, &mut v, Finalise("hello".into())).await.unwrap();
        let req = p.read_request(&c, &mut v.as_slice()).await.unwrap();
        assert_eq!(req, Finalise("hello".into()));

        let cbor = CborBuilder::new().encode_array(|b| {
            b.encode_u64(42);
            b.encode_null();
        });
        v.clear();
        v.extend_from_slice(&(cbor.as_slice().len() as u32).to_be_bytes());
        v.extend_from_slice(cbor.as_slice());
        let req = p.read_request(&c, &mut v.as_slice()).await.unwrap();
        assert_eq!(req, BanyanRequest::Future);

        v.clear();
        p.write_response(&c, &mut v, Ok).await.unwrap();
        let res = p.read_response(&c, &mut v.as_slice()).await.unwrap();
        assert_eq!(res, Ok);

        v.clear();
        p.write_response(&c, &mut v, Error("soso".into())).await.unwrap();
        let res = p.read_response(&c, &mut v.as_slice()).await.unwrap();
        assert_eq!(res, Error("soso".into()));

        v.clear();
        v.extend_from_slice(&(cbor.as_slice().len() as u32).to_be_bytes());
        v.extend_from_slice(cbor.as_slice());
        let req = p.read_request(&c, &mut v.as_slice()).await.unwrap();
        assert_eq!(req, BanyanRequest::Future);
    }
}