jetstream 0.4.0

Jetstream is a RPC framework for Rust, based on the 9P protocol and QUIC.
Documentation
<img src="logo/JetStream.png" style="width: 200px">

#  JetStream [![crates.io]https://img.shields.io/crates/v/jetstream.svg]https://crates.io/crates/jetstream [![docs.rs]https://docs.rs/jetstream/badge.svg]https://docs.rs/jetstream <!--gh actions--> ![Build Status]https://github.com/sevki/jetstream/actions/workflows/rust.yml/badge.svg ![Build Status]https://github.com/sevki/jetstream/actions/workflows/release.yml/badge.svg


JetStream is an RPC framework built on top of [s2n-quic](https://crates.io/crates/s2n-quic) and [p9](https://crates.io/crates/p9). It's designed to be a high performance, low latency, secure, and reliable RPC framework.

Features:

- Bidirectional streaming
- 0-RTT
- [mTLS]https://github.com/aws/s2n-quic/tree/main/examples/s2n-mtls
- binary encoding

## Motivation

Building remote filesystems over internet, is the main motivation behind JetStream.

## Ready?

JetStream is not ready for production use. It's still in the early stages of development.

## Alternatives

- [grpc]https://grpc.io/
- [capnproto]https://capnproto.org/
- [thrift]https://thrift.apache.org/
- [jsonrpc]https://www.jsonrpc.org/
- [tarpc]https://crates.io/crates/tarpc


## Example [test]src/server/server_tests.rs

```rust
let _guard = slog_scope::set_global_logger(setup_logging());
        let _guard = slog_envlogger::new(drain());
        let (_tx, _rx) = tokio::io::duplex(1024);
        pub static CA_CERT_PEM: &str =
            concat!(env!("CARGO_MANIFEST_DIR"), "/certs/ca-cert.pem");
        pub static SERVER_CERT_PEM: &str =
            concat!(env!("CARGO_MANIFEST_DIR"), "/certs/server-cert.pem");
        pub static SERVER_KEY_PEM: &str =
            concat!(env!("CARGO_MANIFEST_DIR"), "/certs/server-key.pem");
        pub static CLIENT_CERT_PEM: &str =
            concat!(env!("CARGO_MANIFEST_DIR"), "/certs/client-cert.pem");
        pub static CLIENT_KEY_PEM: &str =
            concat!(env!("CARGO_MANIFEST_DIR"), "/certs/client-key.pem");

        let tls = tls::default::Server::builder()
            .with_trusted_certificate(Path::new(CA_CERT_PEM))
            .unwrap()
            .with_certificate(
                Path::new(SERVER_CERT_PEM),
                Path::new(SERVER_KEY_PEM),
            )
            .unwrap()
            .with_client_authentication()
            .unwrap()
            .build()
            .unwrap();

        let barrier = Arc::new(Barrier::new(3)).clone();
        let c = barrier.clone();
        let srv_handle = tokio::spawn(async move {
            let server = Server::builder()
                .with_tls(tls)
                .unwrap()
                .with_io("127.0.0.1:4433")
                .unwrap()
                .start()
                .unwrap();
            let qsrv: QuicServer<Tframe, Rframe, EchoService> =
                QuicServer::new(ninepecho::EchoService);
            debug!("Server started, waiting for barrier");
            c.wait().await;
            let _ = qsrv.serve(server).await;
        });

        let cert = path::PathBuf::from(CLIENT_CERT_PEM).into_boxed_path();
        let key = path::PathBuf::from(CLIENT_KEY_PEM).into_boxed_path();
        let ca_cert = path::PathBuf::from(CA_CERT_PEM).into_boxed_path();
        let temp_dir = tmpdir::TmpDir::new("q9p").await.unwrap();

        let mut listen = temp_dir.to_path_buf();
        listen.push("q9p.sock");
        let listen = listen.into_boxed_path();
        let l = listen.clone();
        let c = barrier.clone();
        let prxy_handle = tokio::spawn(async move {
            debug!("Proxy started, waiting for barrier");
            c.wait().await;

            let prxy = Proxy::new(
                DialQuic::new(
                    "127.0.0.1".to_string(),
                    4433,
                    cert,
                    key,
                    ca_cert,
                    "localhost".to_string(),
                ),
                l.clone(),
            );
            let _ = prxy.run().await;
        });
        let c = barrier.clone();
        let l = listen.clone();
        let client_handle = tokio::spawn(async move {
            c.clone().wait().await;
            // sleep for 5 milliseconds to give the server time to start
            tokio::time::sleep(std::time::Duration::from_millis(5)).await;
            debug!("Connecting to {:?}", listen);
            let (mut read, mut write) = tokio::net::UnixStream::connect(l)
                .await
                .unwrap()
                .into_split();

            // loop 100 times
            for _ in 0..100 {
                let test = Tframe {
                    tag: 0,
                    msg: Ok(Tmessage::Version(Tversion {
                        msize: 8192,
                        version: "9P2000.L".to_string(),
                    })),
                };
                debug!("Sending tframe: {:?}", test);
                // ping
                test.encode_async(&mut write).await.unwrap();
                write.flush().await.unwrap();
                debug!("Reading rframe");
                read.readable().await.unwrap();
                // pong
                let resp = Rframe::decode_async(&mut read).await.unwrap();
                assert_eq!(resp.tag, 0);
            }
        });

        let timeout = std::time::Duration::from_secs(10);

        let timeout = tokio::time::sleep(timeout);

        tokio::select! {
            _ = timeout => {
                panic!("Timeout");
            }
            _ = srv_handle => {
                panic!("Quic server failed");
            }
            _ = prxy_handle => {
                panic!("Proxy failed");
            }
            _ = client_handle => {
                return;
            }
        }
```

## [License]LICENSE

BSD-3-Clause