jetstream 3.0.0

Jetstream is a RPC framework for Rust, based on the 9P protocol and QUIC.
Documentation
use std::{collections::btree_map, path::PathBuf};

use crate::coding::{Rframe, Tframe};

use crate::ufs::Server;
use crate::{service::JetStreamService, service::Message};

pub struct Handle {
    tframe: Tframe,
    reply_to: tokio::sync::oneshot::Sender<Rframe>,
}

pub struct Ufs {
    sender: tokio::sync::mpsc::UnboundedSender<Handle>,
    processor: tokio::sync::mpsc::UnboundedReceiver<Handle>,
    server: Server,
}

impl Ufs {
    pub fn new(path: PathBuf) -> Self {
        let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<Handle>();
        Self {
            sender: tx,
            processor: rx,
            server: Server::new(
                path,
                btree_map::BTreeMap::new(),
                btree_map::BTreeMap::new(),
            )
            .unwrap(),
        }
    }

    pub fn get_handler(&self) -> Handler {
        Handler {
            tx: self.sender.clone(),
        }
    }
}

impl Ufs {
    pub async fn run(&mut self) -> anyhow::Result<()> {
        while let Some(handle) = self.processor.recv().await {
            let tframe = handle.tframe;
            let reply_to = handle.reply_to;
            let rframe = self.server.handle(&tframe).await.unwrap();
            reply_to.send(rframe).unwrap();
        }
        Ok(())
    }
}

#[derive(Clone)]
pub struct Handler {
    tx: tokio::sync::mpsc::UnboundedSender<Handle>,
}

impl Message for Tframe {}
impl Message for Rframe {}

impl JetStreamService<Tframe, Rframe> for Handler {
    fn call(
        &mut self,
        req: Tframe,
    ) -> std::pin::Pin<
        Box<
            dyn futures::prelude::Future<
                    Output = Result<
                        Rframe,
                        Box<dyn std::error::Error + Send + Sync>,
                    >,
                > + Send,
        >,
    > {
        let (reply, result) = tokio::sync::oneshot::channel::<Rframe>();
        self.tx
            .send(Handle {
                tframe: req,
                reply_to: reply,
            })
            .unwrap();

        Box::pin(async { result.await.map_err(|e| e.into()) })
    }
}