wrpc_transport_quic/
lib.rs

1//! wRPC QUIC transport
2
3use anyhow::Context as _;
4use bytes::Bytes;
5use quinn::{Connection, RecvStream, SendStream, VarInt};
6use tracing::{debug, error, trace, warn};
7use wrpc_transport::frame::{Accept, Incoming, InvokeBuilder, Outgoing};
8use wrpc_transport::Invoke;
9
10/// QUIC server with graceful stream shutdown handling
11pub type Server = wrpc_transport::Server<(), RecvStream, SendStream, ConnHandler>;
12
13/// QUIC wRPC client
14#[derive(Clone, Debug)]
15pub struct Client(Connection);
16
17/// Graceful stream shutdown handler
18pub struct ConnHandler;
19
20const DONE: VarInt = VarInt::from_u32(1);
21
22impl wrpc_transport::frame::ConnHandler<RecvStream, SendStream> for ConnHandler {
23    async fn on_ingress(mut rx: RecvStream, res: std::io::Result<()>) {
24        if let Err(err) = res {
25            error!(?err, "ingress failed");
26        } else {
27            debug!("ingress successfully complete");
28        }
29        if let Err(err) = rx.stop(DONE) {
30            debug!(?err, "failed to close incoming stream");
31        }
32    }
33
34    async fn on_egress(mut tx: SendStream, res: std::io::Result<()>) {
35        if let Err(err) = res {
36            error!(?err, "egress failed");
37        } else {
38            debug!("egress successfully complete");
39        }
40        match tx.stopped().await {
41            Ok(None) => {
42                trace!("stream successfully closed")
43            }
44            Ok(Some(code)) => {
45                if code == DONE {
46                    trace!("stream successfully closed")
47                } else {
48                    warn!(?code, "stream closed with code")
49                }
50            }
51            Err(err) => {
52                error!(?err, "failed to await stream close");
53            }
54        }
55    }
56}
57
58impl From<Connection> for Client {
59    fn from(conn: Connection) -> Self {
60        Self(conn)
61    }
62}
63
64impl Invoke for &Client {
65    type Context = ();
66    type Outgoing = Outgoing;
67    type Incoming = Incoming;
68
69    async fn invoke<P>(
70        &self,
71        (): Self::Context,
72        instance: &str,
73        func: &str,
74        params: Bytes,
75        paths: impl AsRef<[P]> + Send,
76    ) -> anyhow::Result<(Self::Outgoing, Self::Incoming)>
77    where
78        P: AsRef<[Option<usize>]> + Send + Sync,
79    {
80        let (tx, rx) = self
81            .0
82            .open_bi()
83            .await
84            .context("failed to open parameter stream")?;
85        InvokeBuilder::<ConnHandler>::default()
86            .invoke(tx, rx, instance, func, params, paths)
87            .await
88    }
89}
90
91impl Invoke for Client {
92    type Context = ();
93    type Outgoing = Outgoing;
94    type Incoming = Incoming;
95
96    async fn invoke<P>(
97        &self,
98        (): Self::Context,
99        instance: &str,
100        func: &str,
101        params: Bytes,
102        paths: impl AsRef<[P]> + Send,
103    ) -> anyhow::Result<(Self::Outgoing, Self::Incoming)>
104    where
105        P: AsRef<[Option<usize>]> + Send + Sync,
106    {
107        (&self).invoke((), instance, func, params, paths).await
108    }
109}
110
111impl Accept for &Client {
112    type Context = ();
113    type Outgoing = SendStream;
114    type Incoming = RecvStream;
115
116    async fn accept(&self) -> std::io::Result<(Self::Context, Self::Outgoing, Self::Incoming)> {
117        let (tx, rx) = self.0.accept_bi().await?;
118        Ok(((), tx, rx))
119    }
120}
121
122impl Accept for Client {
123    type Context = ();
124    type Outgoing = SendStream;
125    type Incoming = RecvStream;
126
127    async fn accept(&self) -> std::io::Result<(Self::Context, Self::Outgoing, Self::Incoming)> {
128        (&self).accept().await
129    }
130}