dce-tokio 1.0.0

A tcp/udp routable protocol implementation sample for dce-router
Documentation
use std::any::Any;
use std::collections::HashMap;
use std::fmt::Debug;
use std::net::SocketAddr;
use std::ops::{Deref, DerefMut};
use std::sync::Arc;
use async_trait::async_trait;
use futures_util::SinkExt;
use futures_util::stream::SplitSink;
use tokio::net::TcpStream;
use tokio_util::codec::{BytesCodec, Framed};
use tokio_util::udp::UdpFramed;
use bytes::{BufMut, BytesMut};
use log::{error, warn};
use dce_router::protocol::{HEAD_ID_NAME, HEAD_PATH_NAME, Meta, RoutableProtocol};
use dce_router::request::{Request, Response};
use dce_router::router::Router;
use dce_router::serializer::Serialized;
use dce_util::mixed::{DceErr, DceResult};


pub type SemiTcpRaw<'a> = Request<'a, SemiTcpProtocol, (), ()>;
pub type SemiTcpGet<'a, Dto> = Request<'a, SemiTcpProtocol, (), Dto>;
pub type SemiTcpSame<'a, Dto> = Request<'a, SemiTcpProtocol, Dto, Dto>;
pub type SemiTcp<'a, ReqDto, RespDto> = Request<'a, SemiTcpProtocol, ReqDto, RespDto>;


const ID_PATH_SEPARATOR: char = ';';
const HEAD_BODY_SEPARATOR: &str = ">BODY>>>";


#[derive(Debug)]
pub struct SemiTcpProtocol {
    meta: Meta<BytesMut, BytesMut>,
    body_index: usize,
}

impl SemiTcpProtocol {
    pub async fn route(
        self,
        router: Arc<Router<Self>>,
        stream: &mut SplitSink<Framed<TcpStream, BytesCodec>, BytesMut>,
        context_data: HashMap<String, Box<dyn Any + Send>>,
    ) {
        if let Some(handled) = Self::handle(self, router, context_data).await {
            let _ = stream.send(handled).await.map_err(|e| error!("{e}"));
        }
    }

    pub async fn udp_route(
        self,
        router: Arc<Router<Self>>,
        stream: &mut SplitSink<UdpFramed<BytesCodec>, (BytesMut, SocketAddr)>,
        addr: SocketAddr,
        context_data: HashMap<String, Box<dyn Any + Send>>,
    ) {
        if let Some(handled) = Self::handle(self, router, context_data).await {
            let _ = stream.send((handled, addr)).await.map_err(|e| error!("{e}"));
        }
    }
}

impl From<BytesMut> for SemiTcpProtocol {
    fn from(value: BytesMut) -> Self {
        let mut body_index = 0;
        let mut heads = HashMap::new();
        let mut path = String::from_utf8(value.to_vec()).map_err(|e| warn!("{e}")).map_or(Default::default(), |v| v.trim().to_string());
        if let Some(index) = path.find(HEAD_BODY_SEPARATOR) {
            let mut head_lines: Vec<_> = path[0..index].lines().map(ToString::to_string).collect();
            path = head_lines.remove(0);
            if let Some((tmp_id, tmp_path)) = path.split_once(ID_PATH_SEPARATOR) {
                heads.insert(HEAD_ID_NAME.to_string(), tmp_id.to_string());
                path = tmp_path.to_string();
            }
            heads.extend(head_lines.iter().map(|line| line.split_once(':')

                .map_or_else(|| (line.to_string(), "".to_string()), |(k, v)| (k.to_string(), v.to_string()))));
            body_index = index + HEAD_BODY_SEPARATOR.len();
        }
        heads.insert(HEAD_PATH_NAME.to_string(), path);
        Self { meta: Meta::new(value, heads), body_index, }
    }
}

impl Into<BytesMut> for SemiTcpProtocol {
    fn into(mut self) -> BytesMut {
        let resp = self.meta.resp_mut().take();
        let (id, path) = (self.id(), self.path());
        match resp {
            Some(Response::Raw(resp)) => resp,
            resp => {
                let mut text = BytesMut::new();
                if let Some(id) = id {
                    text.put_slice(id.as_bytes());
                    text.put_slice(ID_PATH_SEPARATOR.to_string().as_bytes());
                }
                text.put_slice(path.as_bytes());
                for (k, v) in self.resp_heads() {
                    text.put_slice(format!("\n{k}:{v}").as_bytes())
                }
                text.put_slice(format!("\n{}\n", HEAD_BODY_SEPARATOR).as_bytes());
                if let Some(Response::Serialized(sd)) = resp {
                    text.put(self.pack_resp(sd));
                }
                text
            }
        }        
    }
}

impl Deref for SemiTcpProtocol {
    type Target = Meta<BytesMut, BytesMut>;

    fn deref(&self) -> &Self::Target {
        &self.meta
    }
}

impl DerefMut for SemiTcpProtocol {
    fn deref_mut(&mut self) -> &mut Self::Target {
        &mut self.meta
    }
}

#[async_trait]
impl RoutableProtocol for SemiTcpProtocol {
    type Req = BytesMut;
    type Resp = Self::Req;

    async fn body(&mut self) -> DceResult<Serialized> {
        String::from_utf8(self.req_mut().take().ok_or_else(|| DceErr::closed0("Empty request"))?.to_vec())
            .map(|t| Serialized::String(t[self.body_index ..].to_string())).map_err(DceErr::closed0)
    }

    fn pack_resp(&self, serialized: Serialized) -> Self::Resp {
        let mut text = BytesMut::new();
        match serialized {
            Serialized::String(str) => text.put_slice(str.as_bytes()),
            Serialized::Bytes(bytes) => text.put_slice(bytes.as_ref()),
        }
        text
    }
}